2020 6.824 的 Raft Lab 4A

目录

  • 前言
  • 一、Overview
    • 1.1 架构图
      • 1.2 架构细节
    • 二、client
  • 三、Master server
    • 3.1 属性
    • 3.2 构造函数
    • 3.3 Join/Move/Leave
    • 3.4 Query
    • 3.5 serverMonitor
    • 3.6 Load balance
  • 四、总结

前言

做2020的MIT6.824,完成了实验 Lab4A,通过了测试,对于之前的Raft实现的实验请参考Lab 2A, Lab 2B 和 Lab 2C 和 Lab 3A 以及 Lab 3B

Lab4A主要是做DB的分片,也就是Client向Server请求,Server根据不同的Request类型,向不同的DB获取/更改数据。实验重点需要完成master对DB server的Join/Leave/Move的三个操作,并且实验Client对Server的configs查询的操作。

整个实验有三个点需要注意

  1. load balance的算法
  2. config需要拷贝而不是引用
  3. labgob需要注册JoinArgs,LeaveArgs,MoveArgs 和 QueryArgs

一、Overview

1.1 架构图

实验是通过数据库分片来提升性能的,Master负责数据库的分片,大概架构如下

1.2 架构细节

第一,对于本次分片来说,最多可以有10个Group,这里Common文件可以看出来

// The number of shards.
const NShards = 10

第二,对于Master来说,也是有多个servers的,所以需要通过Raft保证shards configuration的一致性,从config可以看出来,一个有三个属性需要维护的,其中Shards就是分片configuration, 初始是[0,0,0,0,0,0,0,0,0,0],Groups就是对应每个shard分片中Raft servers的information,比如Group1 -> server[a, b, c]

type Config struct { 
	Num    int              // config number
	Shards [NShards]int     // shard -> gid
	Groups map[int][]string // gid -> servers[]
}

举个例子,shards初始为[0,0,0,0,0,0,0,0,0,0],那么Join了Group1 -> servers[a, b, c] 之后,整个系统就有1个Group了,那么,shards就会变成[1,1,1,1,1,1,1,1,1,1],如果JoinGroup2 -> servers[e, f, g]之后,整个系统就有2个groups了,那么,10个shards就需要尽量平均分配给两个Groups,也就是[1,1,1,1,1,2,2,2,2,2]

这里也就是涉及到了load balance的算法,我的算法实现参考了 网上 的方法,并没有用到高级的数据结构比如heap来实现

二、client

这个跟kvraft中client的实现非常相似,基本搬过来就行

其中,Clerk的构造函数

type Clerk struct { 
	servers []*labrpc.ClientEnd

	mu         sync.Mutex
	leaderId   int
	clientId   int64
	sequenceId int64

	// Your data here.
}
func MakeClerk(servers []*labrpc.ClientEnd) *Clerk { 
	ck := new(Clerk)
	ck.servers = servers
	// Your code here.
	ck.clientId = nrand()
	return ck
}

增加一些新的属性, 主要就是ClientId很SequenceId

type JoinArgs struct { 
	Servers    map[int][]string // new GID -> servers mappings
	ClientId   int64
	SequenceId int64
}

type LeaveArgs struct { 
	GIDs       []int
	ClientId   int64
	SequenceId int64
}

type MoveArgs struct { 
	Shard      int
	GID        int
	ClientId   int64
	SequenceId int64
}

type QueryArgs struct { 
	Num        int // desired config number
	ClientId   int64
	SequenceId int64
}

Join/Move/Leave/Query 的实现,对于Join/Move/Leave 基本都是一样的

func (ck *Clerk) Query(num int) Config { 
	args := &QueryArgs{ }
	// Your code here.
	args.Num = num
	args.ClientId = ck.clientId
	args.SequenceId = atomic.AddInt64(&ck.sequenceId, 1)

	for { 
		reply := QueryReply{ }
		if ck.servers[ck.currentLeader()].Call("ShardMaster.Query", args, &reply) && !reply.WrongLeader { 
			return reply.Config
		}
		ck.leaderId = ck.changeLeader()
		time.Sleep(100 * time.Millisecond)
	}
}

func (ck *Clerk) Join(servers map[int][]string) { 
	args := &JoinArgs{ }
	args.Servers = servers
	args.ClientId = ck.clientId
	args.SequenceId = atomic.AddInt64(&ck.sequenceId, 1)

	for { 
		reply := JoinReply{ }
		if ck.servers[ck.currentLeader()].Call("ShardMaster.Join", args, &reply) && !reply.WrongLeader { 
			return
		}
		ck.leaderId = ck.changeLeader()
		time.Sleep(100 * time.Millisecond)
	}
}

三、Master server

3.1 属性

跟kvRaft的server还是很类似的,需要有两个mapper,分别是requestMapper和sequenceMapper


type ShardMaster struct { 
	mu      sync.Mutex
	me      int
	rf      *raft.Raft
	applyCh chan raft.ApplyMsg

	// Your data here.
	dead    int32
	configs []Config // indexed by config num

	requestMapper  map[int]chan Op
	sequenceMapper map[int64]int64
}

type Op struct { 
	// Your data here.
	SequenceId int64
	ClientId   int64
	OpType     string
	OpArgs     interface{ }
	Index      int
	Term       int
}

type joinLeaveMoveReply struct { 
	WrongLeader bool
	Err         Err
}

const (
	JOIN  string = "Join"
	LEAVE string = "Leave"
	MOVE  string = "Move"
	QUERY string = "Query"
)

3.2 构造函数

这里有个很重要的地方,那就是需要向labgob Register 任何自定义的STRUCT 否则会出现interface{ } is nil 的报错

func StartServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister) *ShardMaster { 
	sm := new(ShardMaster)
	sm.me = me

	sm.configs = make([]Config, 1)
	sm.configs[0].Groups = map[int][]string{ }

	labgob.Register(Op{ })
	labgob.Register(JoinArgs{ })
	labgob.Register(LeaveArgs{ })
	labgob.Register(MoveArgs{ })
	labgob.Register(QueryArgs{ })
	sm.applyCh = make(chan raft.ApplyMsg)
	sm.rf = raft.Make(servers, me, persister, sm.applyCh)

	// Your code here.
	sm.sequenceMapper = make(map[int64]int64)
	sm.requestMapper = make(map[int]chan Op)

	go sm.serverMonitor()

	return sm
}

3.3 Join/Move/Leave

对于Join/Move/Leave 通过Raft保证一致性,实现基本都是一样的,就是

  1. rf.Start通知raft
  2. 等待让她applyMsg的通知,对比term,如果一致则返回success
func (sm *ShardMaster) Join(args *JoinArgs, reply *JoinReply) { 
	var isLeader bool
	clientOp := Op{ OpType: JOIN, OpArgs: *args, SequenceId: args.SequenceId, ClientId: args.ClientId}
	clientOp.Index, clientOp.Term, isLeader = sm.rf.Start(clientOp)
	if !isLeader { 
		reply.WrongLeader = true
		return
	}
	joinLeaveMoveReply := sm.joinLeaveMove(clientOp)
	reply.WrongLeader, reply.Err = joinLeaveMoveReply.WrongLeader, joinLeaveMoveReply.Err
}

func (sm *ShardMaster) Leave(args *LeaveArgs, reply *LeaveReply) { 
	var isLeader bool
	clientOp := Op{ OpType: LEAVE, OpArgs: *args, SequenceId: args.SequenceId, ClientId: args.ClientId}
	clientOp.Index, clientOp.Term, isLeader = sm.rf.Start(clientOp)
	if !isLeader { 
		reply.WrongLeader = true
		return
	}
	joinLeaveMoveReply := sm.joinLeaveMove(clientOp)
	reply.WrongLeader, reply.Err = joinLeaveMoveReply.WrongLeader, joinLeaveMoveReply.Err
}

func (sm *ShardMaster) Move(args *MoveArgs, reply *MoveReply) { 
	var isLeader bool
	clientOp := Op{ OpType: MOVE, OpArgs: *args, SequenceId: args.SequenceId, ClientId: args.ClientId}
	clientOp.Index, clientOp.Term, isLeader = sm.rf.Start(clientOp)
	if !isLeader { 
		reply.WrongLeader = true
		return
	}
	joinLeaveMoveReply := sm.joinLeaveMove(clientOp)
	reply.WrongLeader, reply.Err = joinLeaveMoveReply.WrongLeader, joinLeaveMoveReply.Err
}

func (sm *ShardMaster) joinLeaveMove(clientOp Op) joinLeaveMoveReply { 
	reply := joinLeaveMoveReply{ }
	ch := sm.getChannel(clientOp.Index)
	defer func() { 
		sm.mu.Lock()
		delete(sm.requestMapper, clientOp.Index)
		sm.mu.Unlock()
	}()

	timer := time.NewTicker(2000 * time.Millisecond)
	defer timer.Stop()
	select { 
	case op := <-ch:
		sm.mu.Lock()
		opTerm := op.Term
		sm.mu.Unlock()
		if clientOp.Term != opTerm { 
			reply.WrongLeader = true
		} else { 
			reply.Err = OK
		}
	case <-timer.C:
		reply.WrongLeader = true
	}
	return reply
}

3.4 Query

query的实现return的内容稍微多了一点,那就是需要return config,根据要求,query为-1或者大于configs长度的都返回最新的config,否则返回对应num的config

func (sm *ShardMaster) Query(args *QueryArgs, reply *QueryReply) { 
	// Your code here.
	DPrintf("[%v] Query is called", sm.me)
	var isLeader bool
	clientOp := Op{ OpType: QUERY, OpArgs: *args, SequenceId: args.SequenceId, ClientId: args.ClientId}
	clientOp.Index, clientOp.Term, isLeader = sm.rf.Start(clientOp)
	if !isLeader { 
		reply.WrongLeader = true
		return
	}

	DPrintf("Query [%v] leader is found", sm.me)

	ch := sm.getChannel(clientOp.Index)
	defer func() { 
		sm.mu.Lock()
		delete(sm.requestMapper, clientOp.Index)
		sm.mu.Unlock()
	}()

	timer := time.NewTicker(2000 * time.Millisecond)
	defer timer.Stop()
	select { 
	case op := <-ch:
		DPrintf("[%v] QUERY receive op", sm.me)
		sm.mu.Lock()
		opTerm := op.Term
		sm.mu.Unlock()
		DPrintf("[%v] QUERY clientOp.Term[%v] vs opTerm[%v]", sm.me, clientOp.Term, opTerm)
		if clientOp.Term != opTerm { 
			reply.WrongLeader = true
		} else { 
			DPrintf("[%v] QUERY args.Num=%v sm.config=%v", sm.me, args.Num, sm.configs)
			sm.mu.Lock()
			reply.Err = OK
			if args.Num >= 0 && args.Num < len(sm.configs) { 
				reply.Config = sm.configs[args.Num]
			} else { 
				reply.Config = sm.configs[len(sm.configs)-1]
			}
			sm.mu.Unlock()
		}
	case <-timer.C:
		reply.WrongLeader = true
	}
}

3.5 serverMonitor

这个函数是用来监视Raft的applyCh,如果是Join或者是Leave,需要重新平衡Shards中Groups的分配,其中addNewConfig是很重要的,这是用来避免map的引用带来的error

func (sm *ShardMaster) serverMonitor() { 
	for { 
		if sm.killed() { 
			return
		}
		select { 
		case msg := <-sm.applyCh:
			if msg.IsSnapshot || !msg.CommandValid { 
				continue
			}

			index := msg.CommandIndex
			term := msg.CommandTerm
			op := msg.Command.(Op)
			sm.mu.Lock()
			sequenceInMapper, hasSequence := sm.sequenceMapper[op.ClientId]
			op.Term = term
			if !hasSequence || op.SequenceId > sequenceInMapper { 
				switch op.OpType { 
				case JOIN:
					newConfig := sm.addNewConfig()
					joinArgs := op.OpArgs.(JoinArgs)
					for i, servers := range joinArgs.Servers { 
						newConfig.Groups[i] = servers
						sm.balanceLoad(&newConfig, i, JOIN)
					}
					sm.configs = append(sm.configs, newConfig)
				case LEAVE:
					newConfig := sm.addNewConfig()
					leaveArgs := op.OpArgs.(LeaveArgs)
					for _, gid := range leaveArgs.GIDs { 
						delete(newConfig.Groups, gid)
						sm.balanceLoad(&newConfig, gid, LEAVE)
					}
					sm.configs = append(sm.configs, newConfig)
				case MOVE:
					newConfig := sm.addNewConfig()
					moveArgs := op.OpArgs.(MoveArgs)
					if _, isExists := newConfig.Groups[moveArgs.GID]; isExists { 
						newConfig.Shards[moveArgs.Shard] = moveArgs.GID
					} else { 
						return
					}
					sm.configs = append(sm.configs, newConfig)
				}
				sm.sequenceMapper[op.ClientId] = op.SequenceId
			}
			sm.mu.Unlock()
			sm.getChannel(index) <- op
		}
	}
}

func (sm *ShardMaster) addNewConfig() Config { 
	lastConfig := sm.configs[len(sm.configs)-1]
	nextConfig := Config{ Num: lastConfig.Num + 1, Shards: lastConfig.Shards, Groups: make(map[int][]string)}
	for gid, servers := range lastConfig.Groups { 
		nextConfig.Groups[gid] = append([]string{ }, servers...)
	}
	return nextConfig
}

3.6 Load balance

对于Join来说,就是把多数的GroupId换成新的GroupId
比如,开始是[1,1,1,1,1,2,2,2,2,2],然后又groupId=3加入,那么添加流程就是
[1,1,1,1,1,2,2,2,2,2] ->
[3,1,1,1,1,2,2,2,2,2] ->
[3,1,1,1,1,3,2,2,2,2] ->
[3,3,1,1,1,3,2,2,2,2]

Leave则是逆向操作,比如开始是[3,3,1,1,1,3,2,2,2,2], 然后需要把3撤走,那么撤走流程是
[3,3,1,1,1,3,2,2,2,2] ->
[1,3,1,1,1,3,2,2,2,2] ->
[1,1,1,1,1,3,2,2,2,2] ->
[1,1,1,1,1,2,2,2,2,2]

func (sm *ShardMaster) balanceLoad(c *Config, gid int, request string) { 
	shardsMap := groupByGid(c)
	switch request { 
	case JOIN:
		totalGroups := len(c.Groups)
		newShardNum := NShards / totalGroups
		for i := 0; i < newShardNum; i++ { 
			maxGid := getMaxShardGid(shardsMap)
			c.Shards[shardsMap[maxGid][0]] = gid
			shardsMap[maxGid] = shardsMap[maxGid][1:]
		}
	case LEAVE:
		shardsList, isExists := shardsMap[gid]
		if !isExists { 
			return
		}
		delete(shardsMap, gid)
		if len(c.Groups) == 0 { 
			c.Shards = [NShards]int{ }
			return
		}
		for _, value := range shardsList { 
			minGid := getMinShardGid(shardsMap)
			c.Shards[value] = minGid
			shardsMap[minGid] = append(shardsMap[minGid], value)
		}
	}
}

helper functions, 这是把shards做group操作,变成map[gid] -> shards[]
比如 [3,3,1,1,1,3,2,2,2,2] 就会变成
map[1] -> [2,3,4]
map[2] ->[6,7,8,9]
map[3] ->[0,1]

func groupByGid(c *Config) map[int][]int { 
	shardsMap := map[int][]int{ }
	for k, _ := range c.Groups { 
		shardsMap[k] = []int{ }
	}
	for index, gid := range c.Shards { 
		shardsMap[gid] = append(shardsMap[gid], index)
	}

	return shardsMap
}

func getMaxShardGid(shardsMap map[int][]int) int { 
	max := -1
	gid := -1
	for index, shards := range shardsMap { 
		if max < len(shards) { 
			max = len(shards)
			gid = index
		}
	}
	return gid
}

func getMinShardGid(shardsMap map[int][]int) int { 
	min := NShards
	gid := -1
	for index, shards := range shardsMap { 
		if min > len(shards) { 
			min = len(shards)
			gid = index
		}
	}
	return gid
}

四、总结

本实验难点在于完成load balance算法,推荐使用heap,这样应该是复杂度最低的

本文地址:https://blog.csdn.net/Joshmo/article/details/111955297

(0)
上一篇 2022年3月21日
下一篇 2022年3月21日

相关推荐