etcd 是一个强一致性的分布式键值存储系统,可以提供可靠的分布式集群的数据访问方式。
etcd is a strongly consistent, distributed key-value store that provides a reliable way to store data that needs to be accessed by a distributed system or cluster of machines.
refer: https://etcd.io/
etcd简介
etcd 诞生于 CoreOS 公司,它最初是用于解决集群管理系统中 OS 升级的分布式并发控制、服务发现、集群状态存储 以及 配置文件的存储与分发等问题。基于此,etcd 被设计为提供高可用、强一致的小型 keyvalue 数据存储服务。
项目当前隶属于 CNCF 基金会,被 AWS、Google、Microsoft、Alibaba 等大型互联网公司广泛使用。
核心特性
- 数据存储在集群中的高可用K-V存储
- 允许应用实时监听存储中的K-V的变化
- 能够容忍单点故障,能够应对网络分区
分布式的容灾策略是基于 鸽巢理论 ,假设一个班级有60人,我将一个秘密告诉31人,那么随便在这个班级挑出30人来,肯定至少有一个人指导这个秘密。
etcd与Raft的关系
Raft是强一致性的集群日志同步算法,etcd是一个分布式KV存储,etcd利用raft算法在集群中同步key-value 。etcd集群需要2N+1个节点。
日志一旦由 leader节点 复制到了大多数 follower节点 即可完成提交。
上图可以看到7个节点的集群,log index 为9 的数据已经同步给 a,c,d 所以index 9 属于整个集群,index为11 的几个数据leader 不认,所以这个数据不属于集群。
Raft 可以保证,给客户端承诺过的请求一定是不会丢失的
各个节点的数据一定是最终一致的
raft 算法英文动画演示
交互协议
- HTTP 基于JSON请求,例如 curl,简单通用。
- SDK 内置GRPC协议,性能高效
etcd 功能
refer: etcd 文档演示
申请定时器,举例:申请一个TTL为10s的租约 lease,当 put 一个 key 时,携带该租约,当TTL到期时,key也会被删除,一个 lease id 可以关联多个key,也就是租约到期,多个key 可以被删除,想要防止被删除,可以用keepalive定期续租。
- txn 事物、loack 分布式锁
etcd 支持将多个请求包装到一个事务中,或者添加一个分布式锁。
安装部署
1
2
3
4
5
| ➜ wget https://github.com/etcd-io/etcd/releases/download/v3.4.9/etcd-v3.4.9-linux-amd64.tar.gz
➜ tar -xf etcd-v3.4.9-linux-amd64.tar.gz
➜ cd etcd-v3.4.9-linux-amd64/
➜ sudo cp etcd /usr/local/sbin/
➜ sudo cp etcdctl /usr/local/sbin/
|
1
2
3
4
5
6
7
8
9
10
| TOKEN=token-01
CLUSTER_STATE=new
NAME_1=machine-1
NAME_2=machine-2
NAME_3=machine-3
HOST_1=127.0.0.1
HOST_2=127.0.0.1
HOST_3=127.0.0.1
CLUSTER=${NAME_1}=http://${HOST_1}:2381,${NAME_2}=http://${HOST_2}:2382,${NAME_3}=http://${HOST_3}:2383
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| # For machine 1
THIS_NAME=${NAME_1}
THIS_IP=${HOST_1}
etcd --data-dir=data.etcd1 --name ${THIS_NAME} \
--initial-advertise-peer-urls http://${THIS_IP}:2381 --listen-peer-urls http://${THIS_IP}:2381 \
--advertise-client-urls http://${THIS_IP}:2371 --listen-client-urls http://${THIS_IP}:2371 \
--initial-cluster ${CLUSTER} \
--initial-cluster-state ${CLUSTER_STATE} --initial-cluster-token ${TOKEN}
# For machine 2
THIS_NAME=${NAME_2}
THIS_IP=${HOST_2}
etcd --data-dir=data.etcd2 --name ${THIS_NAME} \
--initial-advertise-peer-urls http://${THIS_IP}:2382 --listen-peer-urls http://${THIS_IP}:2382 \
--advertise-client-urls http://${THIS_IP}:2372 --listen-client-urls http://${THIS_IP}:2372 \
--initial-cluster ${CLUSTER} \
--initial-cluster-state ${CLUSTER_STATE} --initial-cluster-token ${TOKEN}
# For machine 3
THIS_NAME=${NAME_3}
THIS_IP=${HOST_3}
etcd --data-dir=data.etcd3 --name ${THIS_NAME} \
--initial-advertise-peer-urls http://${THIS_IP}:2383 --listen-peer-urls http://${THIS_IP}:2383 \
--advertise-client-urls http://${THIS_IP}:2373 --listen-client-urls http://${THIS_IP}:2373 \
--initial-cluster ${CLUSTER} \
--initial-cluster-state ${CLUSTER_STATE} --initial-cluster-token ${TOKEN}
|
1
2
3
4
5
6
7
| export ETCDCTL_API=3
HOST_1=127.0.0.1
HOST_2=127.0.0.1
HOST_3=127.0.0.1
ENDPOINTS=$HOST_1:2371,$HOST_2:2372,$HOST_3:2373
etcdctl --endpoints=$ENDPOINTS member list
|
1
2
| ➜ etcdctl --endpoints=$ENDPOINTS put "key" "hello"
OK
|
golang 操作 etcd
github 官方示例代码:https://github.com/etcd-io/etcd/tree/master/clientv3
使用前先安装
1
| go get go.etcd.io/etcd/clientv3
|
解决Golang1.14 etcd/clientv3报错:etcd undefined: resolver.BuildOption
refer: https://blog.csdn.net/qq_43442524/article/details/104997539
连接
1
2
3
4
5
6
7
8
9
10
11
12
13
| var client *clientv3.Client
func Conn() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2371", "localhost:2372", "localhost:2373"},
DialTimeout: 5 * time.Second,
})
if err != nil {
// handle error!
}
defer cli.Close()
client = cli
}
|
简单操作 GET,PUT,DEL
refer:https://godoc.org/go.etcd.io/etcd/clientv3
etcd的操作强大在可以添加 WithOption
比如 WithPrevKV()
获取前一个Key 和 WithPrefix()
遍历
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
| type EtcdApi struct {
KV clientv3.KV
}
func (e EtcdApi) PutKey(key string, value string) error {
ctx, _ := context.WithTimeout(context.Background(),2 * time.Second)
// clientv3.WithPrevKV() 获取到删KV之前的值
if putResp, err := e.KV.Put(ctx,key,value,clientv3.WithPrevKV()); err != nil {
return err
}else {
if putResp.PrevKv != nil {
fmt.Println("PrevValue: ", string(putResp.PrevKv.Value))
}
return nil
}
}
func (e EtcdApi) GetKey(key string) (value string, err error) {
ctx, _ := context.WithTimeout(context.Background(),2 * time.Second)
// 可以遍历目录
if getResp, err := e.KV.Get(ctx,key,clientv3.WithPrefix());err != nil {
return "", err
}else {
return fmt.Sprintf("%s", getResp.Kvs),err
}
}
func (e EtcdApi) DelKey(key string) (value string, err error) {
ctx, _ := context.WithTimeout(context.Background(),2 * time.Second)
if delResp, err := e.KV.Delete(ctx,key,clientv3.WithPrevKV());err != nil {
return value,nil
}else {
return fmt.Sprintf("%s",delResp.PrevKvs),nil
}
}
|
Lease 租期
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| func (e EtcdApi) GrantLease() (lease clientv3.Lease, leaseId clientv3.LeaseID,err error) {
var leaseGrantResp *clientv3.LeaseGrantResponse
lease = clientv3.NewLease(e.Client)
// 申请一个10s的租约
if leaseGrantResp, err = lease.Grant(e.Ctx,10); err != nil {
return
}
leaseId = leaseGrantResp.ID
// Put 一个KV, 关联租约,从而实现10s自动过期
if putResp,err := e.KV.Put(e.Ctx,"/key1/lease","v1",clientv3.WithLease(leaseId)); err != nil {
return
}else {
fmt.Println(putResp.Header.Revision)
return
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| func (e *EtcdApi) KeepAlive(leaseId clientv3.LeaseID) {
// 自动续期
var keepResp *clientv3.LeaseKeepAliveResponse
keepRespChan,err := e.Lease.KeepAlive(context.TODO(), leaseId)
if err != nil {
log.Fatal("ERROR :", err)
}
go func() {
for {
select{
case keepResp = <- keepRespChan:
if keepRespChan == nil {
fmt.Println("租约实效")
goto END
}else { // 每秒续租一次
fmt.Println("续租", keepResp.ID)
}
}
}
END:
fmt.Println("Return")
}()
}
|
Watch 监听
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| func (e *EtcdApi) Watch() {
// 创建一个watcher
e.Watcher = clientv3.Watcher(e.Client)
// 30 s 后取消监听
ctx, cancelFunc := context.WithCancel(context.TODO())
time.AfterFunc(30*time.Second, func() {
cancelFunc()
})
// 启动监听
watchRespChan := e.Watcher.Watch(ctx,"/key1/watcher")
// 处理kv变化事件
for watchResp := range watchRespChan {
for _,event := range watchResp.Events {
switch event.Type {
case mvccpb.PUT: {
fmt.Println(" 修改为: ", string(event.Kv.Value), "Revision: ", event.Kv.CreateRevision , event.Kv.ModRevision)
}
case mvccpb.DELETE: {
fmt.Println(" 删除: Revision: ", event.Kv.ModRevision)
}
}
}
}
}
|
Operation
可以将上面提到的 Get
、Put
、Delete
等操作抽象成一个Operation。
1
2
3
4
5
6
7
8
9
10
11
12
13
| func (e EtcdApi) Op() {
// 创建一个OP:operation
putOp := clientv3.OpPut("/key1/op","value_op")
// 执行OP
opResp, err := e.KV.Do(e.Ctx,putOp)
if err != nil {
fmt.Println(err)
return
}
fmt.Println("写入Revision: ", &opResp.Put().Header.Revision)
}
|
Txn
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| // 处理业务
func (e *EtcdApi) DoTxn(leaseId clientv3.LeaseID) {
// 创建事物
txn := e.KV.Txn(e.Ctx)
// 定义事物
txn.If(clientv3.Compare(clientv3.CreateRevision("/key1/lease"),"=","0")).
Then(clientv3.OpPut("/key1/lease","value1",clientv3.WithLease(leaseId))).
Else(clientv3.OpGet("/key1/lease")) //抢锁失败
txnResp,err := txn.Commit()
if err != nil {
log.Fatal(err.Error())
}
// 是否抢到锁
if !txnResp.Succeeded {
fmt.Println("抢锁失败,锁被占用:" , string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value))
return
}
}
|
分布式锁
etcd分布式锁并不是etcd server对外提供一个功能api,而是基于etcd的各种特性(lease、watch、mvcc等)集成的一个工具。
实现思路
etcd的几种特殊的机制都可以作为分布式锁的基础。etcd的键值对可以作为锁的本体,锁的创建与删除对应键值对的创建与删除。etcd的分布式一致性以及高可用可以保证锁的高可用性。
prefix:由于etcd支持前缀查找,可以将锁设置成“锁名称”+“唯一id”的格式,保证锁的对称性,即每个客户端只操作自己持有的锁。
lease:租约机制可以为锁做一个保活操作,在创建锁的时候绑定租约,并定期进行续约,如果获得锁期间客户端意外宕机,则持有的锁会被自动删除,避免了死锁的产生。
Revision:etcd内部维护了一个全局的Revision值,并会随着事务的递增而递增。可以用Revision值的大小来决定获取锁的先后顺序,在上锁的时候已经决定了获取锁先后顺序,后续有客户端释放锁也不会产生惊群效应。
watch:watch机制可以用于监听锁的删除事件,不必使用忙轮询的方式查看是否释放了锁,更加高效。同时,在watch时候可以通过Revision来进行监听,只需要监听距离自己最近而且比自己小的一个Revision就可以做到锁的实时获取。
源码分析
在 etcd v3版本的客户端中已经有了分布式锁的实现。
实例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| func main() {
//初始化etcd客户端
cli, _ := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:23790"},
DialTimeout: time.Second,
})
//创建一个session,并根据业务情况设置锁的ttl
s, _ := concurrency.NewSession(cli, concurrency.WithTTL(3))
defer s.Close()
//初始化一个锁的实例,并进行加锁解锁操作。
mu := concurrency.NewMutex(s, "mutex-kiosk")
if err := mu.Lock(context.TODO()); err != nil {
log.Fatal("m lock err: ", err)
}
//do something
if err := mu.Unlock(context.TODO()); err != nil {
log.Fatal("m unlock err: ", err)
}
}
|
在调用 NewSession 方法时候实际上是初始化了一个用户指定行为的租约(行为可以是指定ttl或者复用其他的 lease 等),并进行异步的 keepalive。
1
2
3
4
5
6
7
8
9
10
11
12
| type Mutex struct {
s *Session //保存的租约相关的信息
pfx string //锁的名称,key的前缀
myKey string //锁完整的key
myRev int64 //自己的版本号
hdr *pb.ResponseHeader
}
func NewMutex(s *Session, pfx string) *Mutex {
return &Mutex{s, pfx + "/", "", -1, nil}
}
|
NewMutex实际上创建了一个锁的数据结构,该结构可以保存一些锁的信息,入参的“mutex-kiosk”只是一个key的前缀,还有后续要创建的完整key,revision等信息。
Lock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| func (m *Mutex) Lock(ctx context.Context) error {
//首先尝试获取锁
resp, err := m.tryAcquire(ctx)
if err != nil {
return err
}
......
}
func (m *Mutex) tryAcquire(ctx context.Context) (*v3.TxnResponse, error) {
s := m.s
client := m.s.Client()
//完整key是前缀名称加租约ID,由于不同进程生成的不同租约,所以锁互不相同
m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
//cmp通过比较createRevision是否为0判断当前的key是不是第一次创建
cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
//put会把key绑定上租约并存储
put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
//get会获取当前key的值
get := v3.OpGet(m.myKey)
//getOwner是通过前缀来范围查找,WithFirstCreate()筛选出当前存在的最小revision对应的值
getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
if err != nil {
return nil, err
}
//将该事务的revision赋值到锁的myRev字段
m.myRev = resp.Header.Revision
if !resp.Succeeded {
m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
}
return resp, nil
}
|
在获取锁时,先通过事务操作来尝试加锁。如果key是第一次创建,则将 key 绑定租约存储,否则获取key的详细信息。getOwner 通过前缀来进行查找最小 revision ,目的是获取当前锁的持有者,如果最小 Revision 的key释放了锁,则该key会被删除,所以最小revision的key就是当前锁的持有者。
再回到主函数,目前 etcd 中存了锁相关的信息,后面会通过比较 Revision 来判断自己获得了锁还是需要等待锁,如果自己的 Revision 和 ownerKey 的 Revsion相同,说明自己就是锁的持有者。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| func (m *Mutex) Lock(ctx context.Context) error {
resp, err := m.tryAcquire(ctx)
if err != nil {
return err
}
//ownerKey就是当前持有锁的值
ownerKey := resp.Responses[1].GetResponseRange().Kvs
//如果ownerKey的长度为0或者持有者的Revision与自己的Revision相同,说明自己持有锁,可以直接返回,并对共享资源进行操作
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
m.hdr = resp.Header
return nil
}
......
//等待锁的释放
client := m.s.Client()
_, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
if werr != nil {
m.Unlock(client.Ctx())
return werr
}
//确保session没有过期
gresp, werr := client.Get(ctx, m.myKey)
if werr != nil {
m.Unlock(client.Ctx())
return werr
}
if len(gresp.Kvs) == 0 {
return ErrSessionExpired
}
m.hdr = gresp.Header
return nil
}
|
waitDeletes
如果没有获得到锁,就需要等待前面的锁释放,这里主要使用到 watch 机制。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
| func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {
//getOpts会通过两个Option函数获取小于传入的maxCreateRev的Revision的key集合且找出集合中最大的Revison对应的key
//主要是用于获取前一个上锁的key,进而可以watch该key的删除事件
getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))
for {
//get通过getOpts的动作来获取键值对
resp, err := client.Get(ctx, pfx, getOpts...)
if err != nil {
return nil, err
}
//如果长度是0,说明key不存在,代表被删除,前面的锁已经被释放了,可以直接返回
if len(resp.Kvs) == 0 {
return resp.Header, nil
}
lastKey := string(resp.Kvs[0].Key)
//否则通过watch监听上一个锁的删除事件
if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {
return nil, err
}
}
}
func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {
cctx, cancel := context.WithCancel(ctx)
defer cancel()
var wr v3.WatchResponse
//通过Revsion来watch key,也就是前一个锁
wch := client.Watch(cctx, key, v3.WithRev(rev))
for wr = range wch {
for _, ev := range wr.Events {
//监听Delete事件
if ev.Type == mvccpb.DELETE {
return nil
}
}
}
if err := wr.Err(); err != nil {
return err
}
if err := ctx.Err(); err != nil {
return err
}
return fmt.Errorf("lost watcher waiting for delete")
}
|
waitDeletes 正常返回后该进程会获取到锁,进入操作共享资源。
UnLock
解锁操作会直接删除对应的 kv,这会触发下一个锁的获取
1
2
3
4
5
6
7
8
9
| func (m *Mutex) Unlock(ctx context.Context) error {
client := m.s.Client()
if _, err := client.Delete(ctx, m.myKey); err != nil {
return err
}
m.myKey = "\x00"
m.myRev = -1
return nil
}
|
源码链接:https://github.com/etcd-io/etcd/blob/v3.5.1/client/v3/concurrency/mutex.go
详见:https://juejin.cn/post/7062900835038003208