Contents

etcd 的基本入门

etcd 是一个强一致性的分布式键值存储系统,可以提供可靠的分布式集群的数据访问方式。

etcd is a strongly consistent, distributed key-value store that provides a reliable way to store data that needs to be accessed by a distributed system or cluster of machines.

refer: https://etcd.io/


etcd简介

etcd 诞生于 CoreOS 公司,它最初是用于解决集群管理系统中 OS 升级的分布式并发控制服务发现集群状态存储 以及 配置文件的存储与分发等问题。基于此,etcd 被设计为提供高可用、强一致的小型 keyvalue 数据存储服务。

项目当前隶属于 CNCF 基金会,被 AWS、Google、Microsoft、Alibaba 等大型互联网公司广泛使用。

核心特性

  • 数据存储在集群中的高可用K-V存储
  • 允许应用实时监听存储中的K-V的变化
  • 能够容忍单点故障,能够应对网络分区

分布式的容灾策略是基于 鸽巢理论 ,假设一个班级有60人,我将一个秘密告诉31人,那么随便在这个班级挑出30人来,肯定至少有一个人指导这个秘密。

etcd与Raft的关系

Raft是强一致性的集群日志同步算法,etcd是一个分布式KV存储,etcd利用raft算法在集群中同步key-value 。etcd集群需要2N+1个节点。

日志一旦由 leader节点 复制到了大多数 follower节点 即可完成提交。 上图可以看到7个节点的集群,log index 为9 的数据已经同步给 a,c,d 所以index 9 属于整个集群,index为11 的几个数据leader 不认,所以这个数据不属于集群。

Raft 可以保证,给客户端承诺过的请求一定是不会丢失的 各个节点的数据一定是最终一致的

raft 算法英文动画演示

交互协议

  • HTTP 基于JSON请求,例如 curl,简单通用。
  • SDK 内置GRPC协议,性能高效

etcd 功能

refer: etcd 文档演示

  • put、get、del 操作    数据库普通的增加,查找,删除操作,如果想要更新,依旧使用put。

  • watch 监听

    支持监听某一key的变化,有变化即产生回调,另外还支持查找key 的历史版本。

  • lease 租约

  申请定时器,举例:申请一个TTL为10s的租约 lease,当 put 一个 key 时,携带该租约,当TTL到期时,key也会被删除,一个 lease id 可以关联多个key,也就是租约到期,多个key 可以被删除,想要防止被删除,可以用keepalive定期续租。

  • txn 事物、loack 分布式锁   etcd 支持将多个请求包装到一个事务中,或者添加一个分布式锁。

安装部署

  • 安装etcd
1
2
3
4
5
➜  wget https://github.com/etcd-io/etcd/releases/download/v3.4.9/etcd-v3.4.9-linux-amd64.tar.gz
➜  tar -xf etcd-v3.4.9-linux-amd64.tar.gz
➜  cd etcd-v3.4.9-linux-amd64/
➜  sudo cp etcd /usr/local/sbin/
➜  sudo cp etcdctl /usr/local/sbin/
  • 设置集群信息
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
TOKEN=token-01
CLUSTER_STATE=new
NAME_1=machine-1
NAME_2=machine-2
NAME_3=machine-3
HOST_1=127.0.0.1
HOST_2=127.0.0.1
HOST_3=127.0.0.1
CLUSTER=${NAME_1}=http://${HOST_1}:2381,${NAME_2}=http://${HOST_2}:2382,${NAME_3}=http://${HOST_3}:2383

  • 启动集群
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# For machine 1
THIS_NAME=${NAME_1}
THIS_IP=${HOST_1}
etcd --data-dir=data.etcd1 --name ${THIS_NAME} \
	--initial-advertise-peer-urls http://${THIS_IP}:2381 --listen-peer-urls http://${THIS_IP}:2381 \
	--advertise-client-urls http://${THIS_IP}:2371 --listen-client-urls http://${THIS_IP}:2371 \
	--initial-cluster ${CLUSTER} \
	--initial-cluster-state ${CLUSTER_STATE} --initial-cluster-token ${TOKEN}

# For machine 2
THIS_NAME=${NAME_2}
THIS_IP=${HOST_2}
etcd --data-dir=data.etcd2 --name ${THIS_NAME} \
	--initial-advertise-peer-urls http://${THIS_IP}:2382 --listen-peer-urls http://${THIS_IP}:2382 \
	--advertise-client-urls http://${THIS_IP}:2372 --listen-client-urls http://${THIS_IP}:2372 \
	--initial-cluster ${CLUSTER} \
	--initial-cluster-state ${CLUSTER_STATE} --initial-cluster-token ${TOKEN}

# For machine 3
THIS_NAME=${NAME_3}
THIS_IP=${HOST_3}
etcd --data-dir=data.etcd3 --name ${THIS_NAME} \
	--initial-advertise-peer-urls http://${THIS_IP}:2383 --listen-peer-urls http://${THIS_IP}:2383 \
	--advertise-client-urls http://${THIS_IP}:2373 --listen-client-urls http://${THIS_IP}:2373 \
	--initial-cluster ${CLUSTER} \
	--initial-cluster-state ${CLUSTER_STATE} --initial-cluster-token ${TOKEN}
  • 使用etcdctl 连接到etcd
1
2
3
4
5
6
7
export ETCDCTL_API=3
HOST_1=127.0.0.1
HOST_2=127.0.0.1
HOST_3=127.0.0.1
ENDPOINTS=$HOST_1:2371,$HOST_2:2372,$HOST_3:2373

etcdctl --endpoints=$ENDPOINTS member list
  • 尝试添加一个key
1
2
➜  etcdctl --endpoints=$ENDPOINTS put "key" "hello"
OK

golang 操作 etcd

github 官方示例代码:https://github.com/etcd-io/etcd/tree/master/clientv3

使用前先安装

1
go get go.etcd.io/etcd/clientv3

解决Golang1.14 etcd/clientv3报错:etcd undefined: resolver.BuildOption

refer: https://blog.csdn.net/qq_43442524/article/details/104997539

连接

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
var client *clientv3.Client

func Conn() {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{"localhost:2371", "localhost:2372", "localhost:2373"},
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		// handle error!
	}
	defer cli.Close()
	client = cli
}

简单操作 GET,PUT,DEL

refer:https://godoc.org/go.etcd.io/etcd/clientv3

etcd的操作强大在可以添加 WithOption 比如 WithPrevKV() 获取前一个Key 和 WithPrefix() 遍历

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
type EtcdApi struct {
	KV clientv3.KV
}

func (e EtcdApi) PutKey(key string, value string) error {
	ctx, _ := context.WithTimeout(context.Background(),2 * time.Second)

// clientv3.WithPrevKV() 获取到删KV之前的值
	if putResp, err := e.KV.Put(ctx,key,value,clientv3.WithPrevKV()); err != nil {
		return err
	}else {
		if putResp.PrevKv != nil {
			fmt.Println("PrevValue: ", string(putResp.PrevKv.Value))
		}
		return nil
	}
}

func (e EtcdApi) GetKey(key string) (value string, err error) {
	ctx, _ := context.WithTimeout(context.Background(),2 * time.Second)

// 可以遍历目录
	if getResp, err := e.KV.Get(ctx,key,clientv3.WithPrefix());err != nil {
		return "", err
	}else {
		return fmt.Sprintf("%s", getResp.Kvs),err
	}
}

func (e EtcdApi) DelKey(key string) (value string, err error) {
	ctx, _ := context.WithTimeout(context.Background(),2 * time.Second)

	if delResp, err := e.KV.Delete(ctx,key,clientv3.WithPrevKV());err != nil {
		return value,nil
	}else {
		return fmt.Sprintf("%s",delResp.PrevKvs),nil
	}
}


Lease 租期

  • 创建租约
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
func (e EtcdApi) GrantLease() (lease clientv3.Lease, leaseId clientv3.LeaseID,err error) {
	var leaseGrantResp *clientv3.LeaseGrantResponse
	
	lease = clientv3.NewLease(e.Client)

	// 申请一个10s的租约
	if leaseGrantResp, err = lease.Grant(e.Ctx,10); err != nil {
		return 
	}

	leaseId = leaseGrantResp.ID

	// Put 一个KV, 关联租约,从而实现10s自动过期
	if putResp,err := e.KV.Put(e.Ctx,"/key1/lease","v1",clientv3.WithLease(leaseId)); err != nil {
		return 
	}else {
		fmt.Println(putResp.Header.Revision)
		return 
	}
}
  • 续租
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (e *EtcdApi) KeepAlive(leaseId clientv3.LeaseID) {
	// 自动续期
	var keepResp *clientv3.LeaseKeepAliveResponse
	keepRespChan,err := e.Lease.KeepAlive(context.TODO(), leaseId)
	if err != nil {
		log.Fatal("ERROR :", err)
	}

	go func() {
		for {
			select{
			case keepResp = <- keepRespChan:
				if keepRespChan == nil {
					fmt.Println("租约实效")
					goto END
				}else { // 每秒续租一次
					fmt.Println("续租", keepResp.ID)
				}
			}
		}
	END:
		fmt.Println("Return")
	}()
}

Watch 监听

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (e *EtcdApi) Watch() {
	// 创建一个watcher
	e.Watcher = clientv3.Watcher(e.Client)

	// 30 s 后取消监听
	ctx, cancelFunc := context.WithCancel(context.TODO())
	time.AfterFunc(30*time.Second, func() {
		cancelFunc()
	})

	// 启动监听
	watchRespChan := e.Watcher.Watch(ctx,"/key1/watcher")

	// 处理kv变化事件
	for watchResp := range watchRespChan {
		for _,event := range watchResp.Events {
			switch event.Type {
			case mvccpb.PUT: {
				fmt.Println(" 修改为: ", string(event.Kv.Value), "Revision: ", event.Kv.CreateRevision , event.Kv.ModRevision)
			}
			case mvccpb.DELETE: {
				fmt.Println(" 删除: Revision: ", event.Kv.ModRevision)
			}
			}
		}
	}
}

Operation

可以将上面提到的 GetPutDelete等操作抽象成一个Operation。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func (e EtcdApi) Op() {
	// 创建一个OP:operation
	putOp := clientv3.OpPut("/key1/op","value_op")

	// 执行OP
	opResp, err := e.KV.Do(e.Ctx,putOp)
	if err != nil {
		fmt.Println(err)
		return
	}

	fmt.Println("写入Revision: ", &opResp.Put().Header.Revision)
}

Txn

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
// 处理业务
func (e *EtcdApi) DoTxn(leaseId clientv3.LeaseID) {
	// 创建事物
	txn := e.KV.Txn(e.Ctx)
	
	// 定义事物
	txn.If(clientv3.Compare(clientv3.CreateRevision("/key1/lease"),"=","0")).
		Then(clientv3.OpPut("/key1/lease","value1",clientv3.WithLease(leaseId))).
		Else(clientv3.OpGet("/key1/lease"))  //抢锁失败
	
	txnResp,err := txn.Commit()
	if err != nil {
		log.Fatal(err.Error())
	}
	
	// 是否抢到锁
	if !txnResp.Succeeded {
		fmt.Println("抢锁失败,锁被占用:" , string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value))
		return
	}
}

分布式锁

etcd分布式锁并不是etcd server对外提供一个功能api,而是基于etcd的各种特性(lease、watch、mvcc等)集成的一个工具。

实现思路

etcd的几种特殊的机制都可以作为分布式锁的基础。etcd的键值对可以作为锁的本体,锁的创建与删除对应键值对的创建与删除。etcd的分布式一致性以及高可用可以保证锁的高可用性。

  • prefix:由于etcd支持前缀查找,可以将锁设置成“锁名称”+“唯一id”的格式,保证锁的对称性,即每个客户端只操作自己持有的锁。

  • lease:租约机制可以为锁做一个保活操作,在创建锁的时候绑定租约,并定期进行续约,如果获得锁期间客户端意外宕机,则持有的锁会被自动删除,避免了死锁的产生。

  • Revision:etcd内部维护了一个全局的Revision值,并会随着事务的递增而递增。可以用Revision值的大小来决定获取锁的先后顺序,在上锁的时候已经决定了获取锁先后顺序,后续有客户端释放锁也不会产生惊群效应。

  • watch:watch机制可以用于监听锁的删除事件,不必使用忙轮询的方式查看是否释放了锁,更加高效。同时,在watch时候可以通过Revision来进行监听,只需要监听距离自己最近而且比自己小的一个Revision就可以做到锁的实时获取。

    etcd_distribute_lock


源码分析

在 etcd v3版本的客户端中已经有了分布式锁的实现。

实例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
func main() {
    //初始化etcd客户端
	cli, _ := clientv3.New(clientv3.Config{
		Endpoints:   []string{"127.0.0.1:23790"},
		DialTimeout: time.Second,
	})
    //创建一个session,并根据业务情况设置锁的ttl
	s, _ := concurrency.NewSession(cli, concurrency.WithTTL(3))
	defer s.Close()
    //初始化一个锁的实例,并进行加锁解锁操作。
	mu := concurrency.NewMutex(s, "mutex-kiosk")
	if err := mu.Lock(context.TODO()); err != nil {
		log.Fatal("m lock err: ", err)
	}
    //do something
	if err := mu.Unlock(context.TODO()); err != nil {
		log.Fatal("m unlock err: ", err)
	}
}

在调用 NewSession 方法时候实际上是初始化了一个用户指定行为的租约(行为可以是指定ttl或者复用其他的 lease 等),并进行异步的 keepalive。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
type Mutex struct {
	s *Session   //保存的租约相关的信息

	pfx   string //锁的名称,key的前缀
	myKey string //锁完整的key
	myRev int64  //自己的版本号
	hdr   *pb.ResponseHeader
}

func NewMutex(s *Session, pfx string) *Mutex {
	return &Mutex{s, pfx + "/", "", -1, nil}
}

NewMutex实际上创建了一个锁的数据结构,该结构可以保存一些锁的信息,入参的“mutex-kiosk”只是一个key的前缀,还有后续要创建的完整key,revision等信息。


Lock

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (m *Mutex) Lock(ctx context.Context) error {
    //首先尝试获取锁
	resp, err := m.tryAcquire(ctx)
	if err != nil {
		return err
	}
	......
}

func (m *Mutex) tryAcquire(ctx context.Context) (*v3.TxnResponse, error) {
	s := m.s
	client := m.s.Client()
    //完整key是前缀名称加租约ID,由于不同进程生成的不同租约,所以锁互不相同
	m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
    //cmp通过比较createRevision是否为0判断当前的key是不是第一次创建
	cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
	//put会把key绑定上租约并存储
	put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
	//get会获取当前key的值
	get := v3.OpGet(m.myKey)
	//getOwner是通过前缀来范围查找,WithFirstCreate()筛选出当前存在的最小revision对应的值
	getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
	resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
	if err != nil {
		return nil, err
	}
    //将该事务的revision赋值到锁的myRev字段
	m.myRev = resp.Header.Revision
	if !resp.Succeeded {
		m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
	}
	return resp, nil
}


在获取锁时,先通过事务操作来尝试加锁。如果key是第一次创建,则将 key 绑定租约存储,否则获取key的详细信息。getOwner 通过前缀来进行查找最小 revision ,目的是获取当前锁的持有者,如果最小 Revision 的key释放了锁,则该key会被删除,所以最小revision的key就是当前锁的持有者。

再回到主函数,目前 etcd 中存了锁相关的信息,后面会通过比较 Revision 来判断自己获得了锁还是需要等待锁,如果自己的 Revision 和 ownerKey 的 Revsion相同,说明自己就是锁的持有者。


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (m *Mutex) Lock(ctx context.Context) error {
	resp, err := m.tryAcquire(ctx)
	if err != nil {
		return err
	}
	//ownerKey就是当前持有锁的值
	ownerKey := resp.Responses[1].GetResponseRange().Kvs
    //如果ownerKey的长度为0或者持有者的Revision与自己的Revision相同,说明自己持有锁,可以直接返回,并对共享资源进行操作
	if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
		m.hdr = resp.Header
		return nil
	}
	......
    //等待锁的释放
    client := m.s.Client()
	_, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
	if werr != nil {
		m.Unlock(client.Ctx())
		return werr
	}
    //确保session没有过期
	gresp, werr := client.Get(ctx, m.myKey)
	if werr != nil {
		m.Unlock(client.Ctx())
		return werr
	}

	if len(gresp.Kvs) == 0 {
		return ErrSessionExpired
	}
	m.hdr = gresp.Header
    
    return nil 
}

waitDeletes

如果没有获得到锁,就需要等待前面的锁释放,这里主要使用到 watch 机制。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {
    //getOpts会通过两个Option函数获取小于传入的maxCreateRev的Revision的key集合且找出集合中最大的Revison对应的key
    //主要是用于获取前一个上锁的key,进而可以watch该key的删除事件
	getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))
	for {
        //get通过getOpts的动作来获取键值对
		resp, err := client.Get(ctx, pfx, getOpts...)
		if err != nil {
			return nil, err
		}
        //如果长度是0,说明key不存在,代表被删除,前面的锁已经被释放了,可以直接返回
		if len(resp.Kvs) == 0 {
			return resp.Header, nil
		}
		lastKey := string(resp.Kvs[0].Key)
        //否则通过watch监听上一个锁的删除事件
		if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {
			return nil, err
		}
	}
}

func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {
	cctx, cancel := context.WithCancel(ctx)
	defer cancel()

	var wr v3.WatchResponse
    //通过Revsion来watch key,也就是前一个锁
	wch := client.Watch(cctx, key, v3.WithRev(rev))
	for wr = range wch {
		for _, ev := range wr.Events {
             //监听Delete事件
			if ev.Type == mvccpb.DELETE {
				return nil
			}
		}
	}
	if err := wr.Err(); err != nil {
		return err
	}
	if err := ctx.Err(); err != nil {
		return err
	}
	return fmt.Errorf("lost watcher waiting for delete")
}

waitDeletes 正常返回后该进程会获取到锁,进入操作共享资源。


UnLock

解锁操作会直接删除对应的 kv,这会触发下一个锁的获取

1
2
3
4
5
6
7
8
9
func (m *Mutex) Unlock(ctx context.Context) error {
	client := m.s.Client()
	if _, err := client.Delete(ctx, m.myKey); err != nil {
		return err
	}
	m.myKey = "\x00"
	m.myRev = -1
	return nil
}

源码链接:https://github.com/etcd-io/etcd/blob/v3.5.1/client/v3/concurrency/mutex.go


详见:https://juejin.cn/post/7062900835038003208