背景

上周末(2019年10月20日)参加了 TUG 华南区在 Shopee 举办的第三期线下技术沙龙活动,以“不同业务场景下的数据库技术选型思路”来展开分享和探讨的。

其中刘春辉和洪超老师分享了 Shopee 的数据库技术选型思路,在分享中,大家对全局唯一 ID 还挺有疑惑的,那我们今天就来看看 TiDB 中全局唯一 ID 是怎么实现的吧。

文章最后附上活动信息


PD

Placement Driver (简称 PD) 是整个集群的管理模块,其主要工作有三个:一是存储集群的元信息(某个 Key 存储在哪个 TiKV 节点);二是对 TiKV 集群进行调度和负载均衡(如数据的迁移、Raft group leader 的迁移等);三是分配全局唯一且递增的事务 ID。

PD 的命名,来源于 Google Spanner - Spanner:Google's Globally-Distributed Database [译文]Spanner 论文

TiDB 架构图(重点看 PD 跟其他的组件的关系):

分布式 ID

  • 全局唯一性
  • 有序递增
  • 高可用

单调递增的 id 能干的事可多了,可以用来实现数据库的 MVCC,进而实现 ACID 事务,检测冲突什么的。在分布式系统中尤其重要,这个领域其实说白了就是不停在和不确定的 wall clock 作斗争… 如何用更弱的约束达到更强的一致性,我觉得单调递增的唯一 id 生成器是一个利器

1
2
3
4
作者:Ed Huang
链接:https://www.zhihu.com/question/52823076/answer/132331104
来源:知乎
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

业界一般的技术方案

  1. 单机数据库 auto_increment;
  2. 单点批量 ID 生成服务;
  3. Redis INCR INCRBY;
  4. uuid/guid;
  5. 取当前毫秒数;
  6. Snowflake
  7. 利用 zookeeper 生成唯一 ID
  8. MongoDB 的 ObjectId

扩展技术方案

  1. 百度 UidGenerator

UidGenerator 是 Java 实现的, 基于 Snowflake 算法的唯一 ID 生成器。UidGenerator 以组件形式工作在应用项目中, 支持自定义 workerId 位数和初始化策略, 从而适用于 docker 等虚拟化环境下实例自动重启、漂移等场景。 在实现上, UidGenerator 通过借用未来时间来解决 sequence 天然存在的并发限制; 采用 RingBuffer 来缓存已生成的 UID, 并行化 UID 的生产和消费, 同时对 CacheLine 补齐,避免了由 RingBuffer 带来的硬件级「伪共享」问题. 最终单机 QPS 可达 600 万

  1. 美团 Leaf

Leaf 提供两种生成的 ID 的方式(号段模式和 snowflake 模式),你可以同时开启两种方式,也可以指定开启某种方式(默认两种方式为关闭状态)。 在4C8G VM基础上,通过公司RPC方式调用,QPS压测结果近5w/s,TP999 1ms。


PD 分配 ID 的场景

  • 生成 Cluster ID
    • 防止用户多个集群配错了,或者重新部署时数据没清干净
  • 生成唯一 ID
    • RegionID,StoreID,PeerID,etc.

我们接下来就来看看他们的源码实现吧。

生成 Cluster ID

pd 在 startServer() 的时候调用 initClusterID() 可以初始化 Cluster ID。

首先会从 etcd 中读取,取不到,则重新生成。

生成 Cluster ID 的算法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// Generate a random cluster ID.
ts := uint64(time.Now().Unix())
clusterID := (ts << 32) + uint64(rand.Uint32())
value := typeutil.Uint64ToBytes(clusterID)

// Multiple PDs may try to init the cluster ID at the same time.
// Only one PD can commit this transaction, then other PDs can get
// the committed cluster ID.
resp, err := c.Txn(ctx).
	If(clientv3.Compare(clientv3.CreateRevision(key), "=", 0)).
	Then(clientv3.OpPut(key, string(value))).
	Else(clientv3.OpGet(key)).
	Commit()

// Txn commits ok, return the generated cluster ID.
if resp.Succeeded {
	return clusterID, nil
}

解析分配唯一 ID 的过程

1. pd service 的定义

首先,我们可以通过 kvproto/proto/pdpb.proto 来查看 pd 所定义的服务有哪些。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
service PD {
	...
	rpc AllocID(AllocIDRequest) returns (AllocIDResponse) {}
	...
}

message AllocIDRequest {
    RequestHeader header = 1;
}

message AllocIDResponse {
    ResponseHeader header = 1;

    uint64 id = 2;
}

message RequestHeader {
    // cluster_id is the ID of the cluster which be sent to.
    uint64 cluster_id = 1;
}

message ResponseHeader {
    // cluster_id is the ID of the cluster which sent the response.
    uint64 cluster_id = 1;
    Error error = 2;
}
enum ErrorType {
    OK = 0;
    UNKNOWN = 1;
    NOT_BOOTSTRAPPED = 2;
    STORE_TOMBSTONE = 3;
    ALREADY_BOOTSTRAPPED = 4;
    INCOMPATIBLE_VERSION = 5;
    REGION_NOT_FOUND = 6;
}

message Error {
    ErrorType type = 1;
    string message = 2;
}

如果你对 protobuf 还不太了解的话,可以点击前往,咱们这里就不做详细阐述了。

2. AllocID service 的实现。

怎么查找到 AllocID 的实现呢?

检索:AllocID,然后结合我们的 pdpb.proto service 定义,我们就可以对照出结果。

实现代码在:grpc_service.go#L141

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
// AllocID implements gRPC PDServer.
func (s *Server) AllocID(ctx context.Context, request *pdpb.AllocIDRequest) (*pdpb.AllocIDResponse, error) {
	// 1. 校验请求数据,这里是在 pdpb.proto 中封装成 RequestHeader
	if err := s.validateRequest(request.GetHeader()); err != nil {
		return nil, err
	}

	// 2. 我们用一个 idAllocator 来分配 ID
	// We can use an allocator for all types ID allocation.
	id, err := s.idAllocator.Alloc()
	if err != nil {
		return nil, status.Errorf(codes.Unknown, err.Error())
	}

	// 3. 返回请求时的 header 和生成的 ID
	return &pdpb.AllocIDResponse{
		Header: s.header(),
		Id:     id,
	}, nil
}

3. 初始化 idAllocator

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// Server is the pd server.
type Server struct {

	// Server services.
	// for id allocator, we can use one allocator for
	// store, region and peer, because we just need
	// a unique ID.
	idAllocator *id.AllocatorImpl
}

func (s *Server) startServer() error {
}

idAllocator 是在 server.Run 的时候,调用 startServer 时初始化的。

1
2
3
func (s *Server) startServer() error {
	s.idAllocator = id.NewAllocatorImpl(s.client, s.rootPath, s.member.MemberValue())
}

4. 源码阅读 AllocatorImpl

我们根据上一步,可以看到在 pd/server/id/id.go 声明了一个 interface:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Allocator is the allocator to generate unique ID.
type Allocator interface {
	Alloc() (uint64, error)
}
// 步长 1000
const allocStep = uint64(1000)

// AllocatorImpl 是对 Allocator 的实现,用于分配 ID
// AllocatorImpl is used to allocate ID.
type AllocatorImpl struct {
	mu   sync.Mutex
	base uint64
	end  uint64

	// etcd client
	client   *clientv3.Client
	rootPath string
	member   string
}

// NewAllocatorImpl creates a new IDAllocator.
func NewAllocatorImpl(client *clientv3.Client, rootPath string, member string) *AllocatorImpl {
	return &AllocatorImpl{client: client, rootPath: rootPath, member: member}
}

4. Alloc

基本逻辑是:

在 generate() 时会从 etcd 中载入之前持久化的已经发过的 id 作为起点。然后执行一次持久化,将起始 id + allocStep 保存下来。 [id, id + allocStep) 的区间就是缓存。客户端请求时,下发的 id 都是从这个缓存中取的。所以,对于高并发的应用,配置一个大的缓存区间可以获取更高的性能。比如将 allocStep 设为 5000,平均发出 5000 个号才需要持久化一次。

如果出现 pd 服务中断的话,重启启动时会从 etcd 中重新载入配置。(etcd 为高可用)

Alloc ID 的代码,加上注释 66 行。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
// Alloc returns a new id.
func (alloc *AllocatorImpl) Alloc() (uint64, error) {
	// 给分配增加锁,使用 defer 在函数结束时进行释放
	alloc.mu.Lock()
	defer alloc.mu.Unlock()

	// 第一次的时候 base 和 end 都为 0,所以会执行 generate()
	// 否则直接返回 alloc.base++
	if alloc.base == alloc.end {
		end, err := alloc.generate()
		if err != nil {
			return 0, err
		}

		alloc.end = end
		alloc.base = alloc.end - allocStep
	}

	alloc.base++

	return alloc.base, nil
}

func (alloc *AllocatorImpl) generate() (uint64, error) {
	// 获取要给 XXPath 分配 ID 的 key
	key := alloc.getAllocIDPath()
	// 从 etcd 中读取 key 所对应的值
	value, err := etcdutil.GetValue(alloc.client, key)
	if err != nil {
		return 0, err
	}

	var (
		cmp clientv3.Cmp
		end uint64
	)

	if value == nil {
		// create the key
		cmp = clientv3.Compare(clientv3.CreateRevision(key), "=", 0)
	} else {
		// update the key
		end, err = typeutil.BytesToUint64(value)
		if err != nil {
			return 0, err
		}

		cmp = clientv3.Compare(clientv3.Value(key), "=", string(value))
	}
	// 如果以前不存在,则 end 被赋值为 1000(分配的步长),否则,就是原有的值+步长
	end += allocStep
	// 将 uint64 转为 bytes
	value = typeutil.Uint64ToBytes(end)
	// 从 etcd 获取一个事务,然后将值提交到 etcd 中
	txn := kv.NewSlowLogTxn(alloc.client)
	leaderPath := path.Join(alloc.rootPath, "leader")
	t := txn.If(append([]clientv3.Cmp{cmp}, clientv3.Compare(clientv3.Value(leaderPath), "=", alloc.member))...)
	resp, err := t.Then(clientv3.OpPut(key, string(value))).Commit()
	if err != nil {
		return 0, err
	}
	if !resp.Succeeded {
		return 0, errors.New("generate id failed, we may not leader")
	}

	log.Info("idAllocator allocates a new id", zap.Uint64("alloc-id", end))
	idGauge.WithLabelValues("idalloc").Set(float64(end))
	return end, nil
}

func (alloc *AllocatorImpl) getAllocIDPath() string {
	return path.Join(alloc.rootPath, "alloc_id")
}

etcd 中事务是原子执行的,只支持 if … then … else … 这种表达,能实现一些有意思的场景。

其他的一些 Alloc 调用

  • pd/server/cluster.go#AllocPeer()
  • pd/server/cluster_worker.go#handleAskSplit()
  • pd/server/grpc_service.go#AllocID()
  • pd/table/namespace_classifier.go#CreateNamespace()

主要的调用逻辑代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
newRegionID, err := c.s.idAllocator.Alloc()

...

peerIDs := make([]uint64, len(request.Region.Peers))
for i := 0; i < len(peerIDs); i++ {
	if peerIDs[i], err = c.s.idAllocator.Alloc(); err != nil {
		return nil, err
	}
}

相关统计

pd 在启动时,调用 metricutil.Push(&cfg.Metric) 即可开启 prometheus 的上报客户端,默认情况下:每 15 秒上报一次。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// prometheusPushClient pushs metrics to Prometheus Pushgateway.
func prometheusPushClient(job, addr string, interval time.Duration) {
	for {
		err := push.FromGatherer(
			job, push.HostnameGroupingKey(),
			addr,
			prometheus.DefaultGatherer,
		)
		if err != nil {
			log.Error("could not push metrics to Prometheus Pushgateway", zap.Error(err))
		}

		time.Sleep(interval)
	}
}

// Push metircs in background.
func Push(cfg *MetricConfig) {
	if cfg.PushInterval.Duration == zeroDuration || len(cfg.PushAddress) == 0 {
		log.Info("disable Prometheus push client")
		return
	}

	log.Info("start Prometheus push client")

	interval := cfg.PushInterval.Duration
	go prometheusPushClient(cfg.PushJob, cfg.PushAddress, interval)
}

每成功重新 generate() 一次的时候,就会上报一次 prometheus。

idGauge.WithLabelValues("idalloc").Set(float64(end))

高可用容灾

  1. 可以参考美团《Leaf 高可用容灾》
  2. 可以参见有赞《如何做一个靠谱的发号器》

参考资料

  1. TiDB 整体架构
  2. Placement Driver 功能介绍
  3. 介绍 PD Google Slides
  4. TiDB 中的 TSO
  5. 常见分布式全局唯一 ID 生成策略及算法的对比
  6. 阿里 P8 架构师谈:分布式系统全局唯一 ID 简介、特点、5 种生成方式
  7. 全局唯一 ID 在分布式系统中用来做什么用?
  8. etcd v3 客户端用法
  9. 如何做一个靠谱的发号器
  10. 高并发分布式系统唯一 ID 生成
  11. 微信序列号生成器架构设计及演变
  12. etcd 性能表现(官方)

附 TUG 华南区 Shopee 深圳第三期线下活动信息

https://www.huodongxing.com/event/3513772048600

活动的详细安排

后续 https://asktug.com 也会将活动实录整理好后放出来,有兴趣的小伙伴们可以关注。


茶歇驿站

一个可以让你停下来看一看,在茶歇之余给你帮助的小站,这里的内容主要是后端技术,个人管理,团队管理,以及其他个人杂想。