Compare commits
10 Commits
958a4d45b7
...
9d4d24dc3e
| Author | SHA1 | Date | |
|---|---|---|---|
| 9d4d24dc3e | |||
|
|
fa6877b02d | ||
|
|
d21b9ffe74 | ||
|
|
059b435561 | ||
|
|
c96bad59a5 | ||
|
|
8743e4fb99 | ||
|
|
bb597de980 | ||
|
|
8f21c11cfc | ||
|
|
05dc50ab27 | ||
|
|
7ba7e45595 |
@ -32,7 +32,7 @@ func main() {
|
|||||||
func loadPlugin(filename string) (func(string, string) []mr.KeyValue, func(string, []string) string) {
|
func loadPlugin(filename string) (func(string, string) []mr.KeyValue, func(string, []string) string) {
|
||||||
p, err := plugin.Open(filename)
|
p, err := plugin.Open(filename)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("cannot load plugin %v", filename)
|
log.Fatalf("cannot load plugin %v: %s", filename, err)
|
||||||
}
|
}
|
||||||
xmapf, err := p.Lookup("Map")
|
xmapf, err := p.Lookup("Map")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@ -5,11 +5,42 @@ import "net"
|
|||||||
import "os"
|
import "os"
|
||||||
import "net/rpc"
|
import "net/rpc"
|
||||||
import "net/http"
|
import "net/http"
|
||||||
|
import "sync"
|
||||||
|
|
||||||
|
type status uint8
|
||||||
|
|
||||||
|
const (
|
||||||
|
idle status = iota
|
||||||
|
progress
|
||||||
|
completed
|
||||||
|
)
|
||||||
|
|
||||||
type Coordinator struct {
|
type Coordinator struct {
|
||||||
// Your definitions here.
|
// Your definitions here.
|
||||||
|
mapJobStatus map[string]status
|
||||||
|
reduceJobStatus map[string]status
|
||||||
|
|
||||||
|
workerStatus map[int]status
|
||||||
|
|
||||||
|
intermediateFiles map[string][]string
|
||||||
|
|
||||||
|
nReduce int
|
||||||
|
|
||||||
|
workerCount int
|
||||||
|
|
||||||
|
m sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Coordinator) areMapJobsDone() bool {
|
||||||
|
done := true
|
||||||
|
|
||||||
|
for _, job := range c.mapJobStatus {
|
||||||
|
if job != completed {
|
||||||
|
done = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return done
|
||||||
}
|
}
|
||||||
|
|
||||||
// Your code here -- RPC handlers for the worker to call.
|
// Your code here -- RPC handlers for the worker to call.
|
||||||
@ -24,6 +55,54 @@ func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Coordinator) Register(args *RegisterArgs, reply *RegisterReply) error {
|
||||||
|
c.m.Lock()
|
||||||
|
defer c.m.Unlock()
|
||||||
|
|
||||||
|
reply.WorkerId = c.workerCount
|
||||||
|
c.workerCount += 1
|
||||||
|
|
||||||
|
c.workerStatus[reply.WorkerId] = idle
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Coordinator) GetJob(args *GetJobArgs, reply *GetJobReply) error {
|
||||||
|
c.m.Lock()
|
||||||
|
c.m.Unlock()
|
||||||
|
|
||||||
|
if !c.areMapJobsDone() {
|
||||||
|
for filename, jobStatus := range c.mapJobStatus {
|
||||||
|
if jobStatus == idle {
|
||||||
|
var job MapJob
|
||||||
|
job.InputFile = filename
|
||||||
|
job.ReducerCount = c.nReduce
|
||||||
|
c.mapJobStatus[filename] = progress
|
||||||
|
|
||||||
|
reply.Finished = false
|
||||||
|
reply.JobType = Map
|
||||||
|
reply.MapJob = job
|
||||||
|
|
||||||
|
c.workerStatus[args.WorkerId] = progress
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Coordinator) Finish(args *MapResult, reply *FinishReply) error {
|
||||||
|
c.m.Lock()
|
||||||
|
defer c.m.Unlock()
|
||||||
|
|
||||||
|
c.intermediateFiles[args.InputFile] = args.IntermediateFiles
|
||||||
|
c.mapJobStatus[args.InputFile] = completed
|
||||||
|
c.workerStatus[args.WorkerId] = idle
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
// start a thread that listens for RPCs from worker.go
|
// start a thread that listens for RPCs from worker.go
|
||||||
@ -63,7 +142,15 @@ func MakeCoordinator(files []string, nReduce int) *Coordinator {
|
|||||||
c := Coordinator{}
|
c := Coordinator{}
|
||||||
|
|
||||||
// Your code here.
|
// Your code here.
|
||||||
|
c.mapJobStatus = make(map[string]status)
|
||||||
|
c.intermediateFiles = make(map[string][]string)
|
||||||
|
c.workerStatus = make(map[int]status)
|
||||||
|
c.nReduce = nReduce
|
||||||
|
|
||||||
|
for _, v := range files {
|
||||||
|
c.mapJobStatus[v] = idle
|
||||||
|
c.intermediateFiles[v] = make([]string, nReduce)
|
||||||
|
}
|
||||||
|
|
||||||
c.server()
|
c.server()
|
||||||
return &c
|
return &c
|
||||||
|
|||||||
@ -24,6 +24,54 @@ type ExampleReply struct {
|
|||||||
|
|
||||||
// Add your RPC definitions here.
|
// Add your RPC definitions here.
|
||||||
|
|
||||||
|
type JobType uint8
|
||||||
|
const (
|
||||||
|
Map JobType = iota
|
||||||
|
Reduce
|
||||||
|
)
|
||||||
|
|
||||||
|
type MapJob struct {
|
||||||
|
//Index int
|
||||||
|
InputFile string
|
||||||
|
ReducerCount int
|
||||||
|
//IntermediateFiles []string
|
||||||
|
}
|
||||||
|
|
||||||
|
type ReduceJob struct {
|
||||||
|
ReducerNumber int
|
||||||
|
IntermediateFiles []string
|
||||||
|
}
|
||||||
|
|
||||||
|
type MapResult struct {
|
||||||
|
InputFile string
|
||||||
|
IntermediateFiles []string
|
||||||
|
WorkerId int
|
||||||
|
}
|
||||||
|
|
||||||
|
type RegisterArgs struct {
|
||||||
|
// empty
|
||||||
|
}
|
||||||
|
|
||||||
|
type RegisterReply struct {
|
||||||
|
WorkerId int
|
||||||
|
}
|
||||||
|
|
||||||
|
type GetJobArgs struct {
|
||||||
|
WorkerId int
|
||||||
|
}
|
||||||
|
|
||||||
|
type GetJobReply struct {
|
||||||
|
JobType JobType
|
||||||
|
|
||||||
|
MapJob MapJob
|
||||||
|
ReduceJob ReduceJob
|
||||||
|
|
||||||
|
Finished bool
|
||||||
|
}
|
||||||
|
|
||||||
|
type FinishReply struct {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
// Cook up a unique-ish UNIX-domain socket name
|
// Cook up a unique-ish UNIX-domain socket name
|
||||||
// in /var/tmp, for the coordinator.
|
// in /var/tmp, for the coordinator.
|
||||||
|
|||||||
103
src/mr/worker.go
103
src/mr/worker.go
@ -4,7 +4,9 @@ import "fmt"
|
|||||||
import "log"
|
import "log"
|
||||||
import "net/rpc"
|
import "net/rpc"
|
||||||
import "hash/fnv"
|
import "hash/fnv"
|
||||||
|
import "os"
|
||||||
|
import "io"
|
||||||
|
import "encoding/json"
|
||||||
|
|
||||||
//
|
//
|
||||||
// Map functions return a slice of KeyValue.
|
// Map functions return a slice of KeyValue.
|
||||||
@ -24,20 +26,88 @@ func ihash(key string) int {
|
|||||||
return int(h.Sum32() & 0x7fffffff)
|
return int(h.Sum32() & 0x7fffffff)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var id int
|
||||||
|
|
||||||
//
|
//
|
||||||
// main/mrworker.go calls this function.
|
// main/mrworker.go calls this function.
|
||||||
//
|
//
|
||||||
func Worker(mapf func(string, string) []KeyValue,
|
func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) {
|
||||||
reducef func(string, []string) string) {
|
|
||||||
|
|
||||||
// Your worker implementation here.
|
// Your worker implementation here.
|
||||||
|
|
||||||
|
registerReply := Register()
|
||||||
|
id = registerReply.WorkerId
|
||||||
|
|
||||||
|
done := false
|
||||||
|
|
||||||
|
for !done {
|
||||||
|
reply := GetJob(GetJobArgs{id})
|
||||||
|
|
||||||
|
if reply.Finished {
|
||||||
|
done = true
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if reply.JobType == Map {
|
||||||
|
doMap(mapf, reply.MapJob)
|
||||||
|
} else if reply.JobType == Reduce {
|
||||||
|
doReduce(reducef, reply.ReduceJob)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// uncomment to send the Example RPC to the coordinator.
|
// uncomment to send the Example RPC to the coordinator.
|
||||||
// CallExample()
|
// CallExample()
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func doMap(mapf func(string, string) []KeyValue, job MapJob) {
|
||||||
|
file, err := os.Open(job.InputFile)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("[Error] Could not open file %s", job.InputFile)
|
||||||
|
}
|
||||||
|
|
||||||
|
content, err := io.ReadAll(file)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("[Error] Could not read file %s", job.InputFile)
|
||||||
|
}
|
||||||
|
file.Close()
|
||||||
|
|
||||||
|
kva := mapf(job.InputFile, string(content))
|
||||||
|
|
||||||
|
sort.Sort(ByKey(kva))
|
||||||
|
|
||||||
|
partitions := make([][]KeyValue, job.ReducerCount)
|
||||||
|
|
||||||
|
for _, v := range kva {
|
||||||
|
i := ihash(v.Key) % job.ReducerCount
|
||||||
|
partitions[i] = append(partitions[i], v)
|
||||||
|
}
|
||||||
|
|
||||||
|
intermediateFiles := make([]string, job.ReducerCount)
|
||||||
|
|
||||||
|
for i, v := range partitions {
|
||||||
|
filename := fmt.Sprintf("mr-%v-%v.txt", job.Index, i)
|
||||||
|
file, err := os.Open(filename);
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("[Error] Could not open file %s", filename)
|
||||||
|
}
|
||||||
|
|
||||||
|
enc := json.NewEncoder(file)
|
||||||
|
for _, kv := range v {
|
||||||
|
err := enc.Encode(&kv)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("[Error] Could not write to file %s", filename)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
intermediateFiles[i] = filename
|
||||||
|
|
||||||
|
file.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
Finish(MapResult{job.InputFile, intermediateFiles, id})
|
||||||
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
// example function to show how to make an RPC call to the coordinator.
|
// example function to show how to make an RPC call to the coordinator.
|
||||||
//
|
//
|
||||||
@ -65,6 +135,33 @@ func CallExample() {
|
|||||||
} else {
|
} else {
|
||||||
fmt.Printf("call failed!\n")
|
fmt.Printf("call failed!\n")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
call("Coordinatior.Wrong", &args, &reply);
|
||||||
|
}
|
||||||
|
|
||||||
|
func Register() RegisterReply {
|
||||||
|
var args RegisterArgs
|
||||||
|
|
||||||
|
var reply RegisterReply
|
||||||
|
|
||||||
|
call("Coordinator.Register", &args, &reply)
|
||||||
|
return reply
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetJob(args GetJobArgs) GetJobReply {
|
||||||
|
var reply GetJobReply
|
||||||
|
|
||||||
|
call("Coordinator.GetJob", &args, &reply)
|
||||||
|
|
||||||
|
return reply
|
||||||
|
}
|
||||||
|
|
||||||
|
func Finish(args MapResult) FinishReply {
|
||||||
|
var reply FinishReply
|
||||||
|
|
||||||
|
call("Coordinator.Finish", &args, &reply)
|
||||||
|
|
||||||
|
return reply
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
|
|||||||
@ -1,31 +0,0 @@
|
|||||||
package lock
|
|
||||||
|
|
||||||
import (
|
|
||||||
|
|
||||||
"6.5840/kvsrv1"
|
|
||||||
"6.5840/kvsrv1/rpc"
|
|
||||||
"6.5840/shardkv1/shardctrler/param"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
type Lock struct {
|
|
||||||
ck *kvsrv.Clerk
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// Use l as the key to store the "lock state" (you would have to decide
|
|
||||||
// precisely what the lock state is).
|
|
||||||
func MakeLock(ck kvtest.IKVClerk, l string) *Lock {
|
|
||||||
lk := &Lock{ck: ck.(*kvsrv.Clerk)}
|
|
||||||
// You may add code here
|
|
||||||
return lk
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
func (lk *Lock) Acquire() {
|
|
||||||
// You may add code here.
|
|
||||||
}
|
|
||||||
|
|
||||||
func (lk *Lock) Release() {
|
|
||||||
// You may add code here.
|
|
||||||
}
|
|
||||||
@ -1,89 +0,0 @@
|
|||||||
package lock
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
// "log"
|
|
||||||
"strconv"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"6.5840/kvsrv1"
|
|
||||||
"6.5840/kvsrv1/rpc"
|
|
||||||
"6.5840/kvtest1"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
NACQUIRE = 10
|
|
||||||
NCLNT = 10
|
|
||||||
NSEC = 2
|
|
||||||
)
|
|
||||||
|
|
||||||
func oneClient(t *testing.T, me int, ck kvtest.IKVClerk, done chan struct{}) kvtest.ClntRes {
|
|
||||||
lk := MakeLock(ck, "l")
|
|
||||||
ck.Put("l0", "", 0)
|
|
||||||
for i := 1; true; i++ {
|
|
||||||
select {
|
|
||||||
case <-done:
|
|
||||||
return kvtest.ClntRes{i, 0}
|
|
||||||
default:
|
|
||||||
lk.Acquire()
|
|
||||||
|
|
||||||
// log.Printf("%d: acquired lock", me)
|
|
||||||
|
|
||||||
b := strconv.Itoa(me)
|
|
||||||
val, ver, err := ck.Get("l0")
|
|
||||||
if err == rpc.OK {
|
|
||||||
if val != "" {
|
|
||||||
t.Fatalf("%d: two clients acquired lock %v", me, val)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
t.Fatalf("%d: get failed %v", me, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = ck.Put("l0", string(b), ver)
|
|
||||||
if !(err == rpc.OK || err == rpc.ErrMaybe) {
|
|
||||||
t.Fatalf("%d: put failed %v", me, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
|
||||||
|
|
||||||
err = ck.Put("l0", "", ver+1)
|
|
||||||
if !(err == rpc.OK || err == rpc.ErrMaybe) {
|
|
||||||
t.Fatalf("%d: put failed %v", me, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// log.Printf("%d: release lock", me)
|
|
||||||
|
|
||||||
lk.Release()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return kvtest.ClntRes{}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Run test clients
|
|
||||||
func runClients(t *testing.T, nclnt int, reliable bool) {
|
|
||||||
ts := kvsrv.MakeTestKV(t, reliable)
|
|
||||||
defer ts.Cleanup()
|
|
||||||
|
|
||||||
ts.Begin(fmt.Sprintf("Test: %d lock clients", nclnt))
|
|
||||||
|
|
||||||
ts.SpawnClientsAndWait(nclnt, NSEC*time.Second, func(me int, myck kvtest.IKVClerk, done chan struct{}) kvtest.ClntRes {
|
|
||||||
return oneClient(t, me, myck, done)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestOneClientReliable(t *testing.T) {
|
|
||||||
runClients(t, 1, true)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestManyClientsReliable(t *testing.T) {
|
|
||||||
runClients(t, NCLNT, true)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestOneClientUnreliable(t *testing.T) {
|
|
||||||
runClients(t, 1, false)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestManyClientsUnreliable(t *testing.T) {
|
|
||||||
runClients(t, NCLNT, false)
|
|
||||||
}
|
|
||||||
@ -1,7 +0,0 @@
|
|||||||
package param
|
|
||||||
|
|
||||||
const (
|
|
||||||
// Length of time that a lease is valid. If a client doesn't
|
|
||||||
// refresh the lease within this time, the lease will expire.
|
|
||||||
LEASETIMESEC = 3
|
|
||||||
)
|
|
||||||
@ -6,10 +6,7 @@ package shardctrler
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
|
|
||||||
"sync/atomic"
|
|
||||||
|
|
||||||
"6.5840/kvsrv1"
|
"6.5840/kvsrv1"
|
||||||
"6.5840/kvsrv1/rpc"
|
|
||||||
"6.5840/kvtest1"
|
"6.5840/kvtest1"
|
||||||
"6.5840/shardkv1/shardcfg"
|
"6.5840/shardkv1/shardcfg"
|
||||||
"6.5840/tester1"
|
"6.5840/tester1"
|
||||||
@ -22,14 +19,13 @@ type ShardCtrler struct {
|
|||||||
kvtest.IKVClerk
|
kvtest.IKVClerk
|
||||||
|
|
||||||
killed int32 // set by Kill()
|
killed int32 // set by Kill()
|
||||||
leases bool
|
|
||||||
|
|
||||||
// Your data here.
|
// Your data here.
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make a ShardCltler, which stores its state in a kvsrv.
|
// Make a ShardCltler, which stores its state in a kvsrv.
|
||||||
func MakeShardCtrler(clnt *tester.Clnt, leases bool) *ShardCtrler {
|
func MakeShardCtrler(clnt *tester.Clnt) *ShardCtrler {
|
||||||
sck := &ShardCtrler{clnt: clnt, leases: leases}
|
sck := &ShardCtrler{clnt: clnt}
|
||||||
srv := tester.ServerName(tester.GRP0, 0)
|
srv := tester.ServerName(tester.GRP0, 0)
|
||||||
sck.IKVClerk = kvsrv.MakeClerk(clnt, srv)
|
sck.IKVClerk = kvsrv.MakeClerk(clnt, srv)
|
||||||
// Your code here.
|
// Your code here.
|
||||||
@ -38,20 +34,15 @@ func MakeShardCtrler(clnt *tester.Clnt, leases bool) *ShardCtrler {
|
|||||||
|
|
||||||
// The tester calls InitController() before starting a new
|
// The tester calls InitController() before starting a new
|
||||||
// controller. In part A, this method doesn't need to do anything. In
|
// controller. In part A, this method doesn't need to do anything. In
|
||||||
// B and C, this method implements recovery (part B) and uses a lock
|
// B and C, this method implements recovery.
|
||||||
// to become leader (part C).
|
|
||||||
func (sck *ShardCtrler) InitController() {
|
func (sck *ShardCtrler) InitController() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// The tester calls ExitController to exit a controller. In part B and
|
|
||||||
// C, release lock.
|
|
||||||
func (sck *ShardCtrler) ExitController() {
|
|
||||||
}
|
|
||||||
|
|
||||||
// Called once by the tester to supply the first configuration. You
|
// Called once by the tester to supply the first configuration. You
|
||||||
// can marshal ShardConfig into a string using shardcfg.String(), and
|
// can marshal ShardConfig into a string using shardcfg.String(), and
|
||||||
// then Put it in the kvsrv for the controller at version 0. You can
|
// then Put it in the kvsrv for the controller at version 0. You can
|
||||||
// pick the key to name the configuration.
|
// pick the key to name the configuration. The initial configuration
|
||||||
|
// lists shardgrp shardcfg.Gid1 for all shards.
|
||||||
func (sck *ShardCtrler) InitConfig(cfg *shardcfg.ShardConfig) {
|
func (sck *ShardCtrler) InitConfig(cfg *shardcfg.ShardConfig) {
|
||||||
// Your code here
|
// Your code here
|
||||||
}
|
}
|
||||||
@ -61,25 +52,13 @@ func (sck *ShardCtrler) InitConfig(cfg *shardcfg.ShardConfig) {
|
|||||||
// changes the configuration it may be superseded by another
|
// changes the configuration it may be superseded by another
|
||||||
// controller.
|
// controller.
|
||||||
func (sck *ShardCtrler) ChangeConfigTo(new *shardcfg.ShardConfig) {
|
func (sck *ShardCtrler) ChangeConfigTo(new *shardcfg.ShardConfig) {
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Tester "kills" shardctrler by calling Kill(). For your
|
|
||||||
// convenience, we also supply isKilled() method to test killed in
|
|
||||||
// loops.
|
|
||||||
func (sck *ShardCtrler) Kill() {
|
|
||||||
atomic.StoreInt32(&sck.killed, 1)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sck *ShardCtrler) isKilled() bool {
|
|
||||||
z := atomic.LoadInt32(&sck.killed)
|
|
||||||
return z == 1
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// Return the current configuration and its version number
|
|
||||||
func (sck *ShardCtrler) Query() (*shardcfg.ShardConfig, rpc.Tversion) {
|
|
||||||
// Your code here.
|
// Your code here.
|
||||||
return nil, 0
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// Return the current configuration
|
||||||
|
func (sck *ShardCtrler) Query() *shardcfg.ShardConfig {
|
||||||
|
// Your code here.
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -2,7 +2,6 @@ package shardgrp
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
|
|
||||||
|
|
||||||
"6.5840/kvsrv1/rpc"
|
"6.5840/kvsrv1/rpc"
|
||||||
"6.5840/shardkv1/shardcfg"
|
"6.5840/shardkv1/shardcfg"
|
||||||
"6.5840/tester1"
|
"6.5840/tester1"
|
||||||
@ -11,7 +10,7 @@ import (
|
|||||||
type Clerk struct {
|
type Clerk struct {
|
||||||
clnt *tester.Clnt
|
clnt *tester.Clnt
|
||||||
servers []string
|
servers []string
|
||||||
leader int // last successful leader (index into servers[])
|
// You will have to modify this struct.
|
||||||
}
|
}
|
||||||
|
|
||||||
func MakeClerk(clnt *tester.Clnt, servers []string) *Clerk {
|
func MakeClerk(clnt *tester.Clnt, servers []string) *Clerk {
|
||||||
@ -19,24 +18,27 @@ func MakeClerk(clnt *tester.Clnt, servers []string) *Clerk {
|
|||||||
return ck
|
return ck
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ck *Clerk) Get(key string, n shardcfg.Tnum) (string, rpc.Tversion, rpc.Err) {
|
func (ck *Clerk) Get(key string) (string, rpc.Tversion, rpc.Err) {
|
||||||
// Your code here
|
// Your code here
|
||||||
return "", 0, ""
|
return "", 0, ""
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ck *Clerk) Put(key string, value string, version rpc.Tversion, n shardcfg.Tnum) (bool, rpc.Err) {
|
func (ck *Clerk) Put(key string, value string, version rpc.Tversion) rpc.Err {
|
||||||
// Your code here
|
// Your code here
|
||||||
return false, ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ck *Clerk) Freeze(s shardcfg.Tshid, num shardcfg.Tnum) ([]byte, rpc.Err) {
|
func (ck *Clerk) FreezeShard(s shardcfg.Tshid, num shardcfg.Tnum) ([]byte, rpc.Err) {
|
||||||
|
// Your code here
|
||||||
return nil, ""
|
return nil, ""
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ck *Clerk) InstallShard(s shardcfg.Tshid, state []byte, num shardcfg.Tnum) rpc.Err {
|
func (ck *Clerk) InstallShard(s shardcfg.Tshid, state []byte, num shardcfg.Tnum) rpc.Err {
|
||||||
|
// Your code here
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ck *Clerk) Delete(s shardcfg.Tshid, num shardcfg.Tnum) rpc.Err {
|
func (ck *Clerk) DeleteShard(s shardcfg.Tshid, num shardcfg.Tnum) rpc.Err {
|
||||||
|
// Your code here
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|||||||
@ -14,11 +14,12 @@ import (
|
|||||||
|
|
||||||
|
|
||||||
type KVServer struct {
|
type KVServer struct {
|
||||||
gid tester.Tgid
|
|
||||||
me int
|
me int
|
||||||
dead int32 // set by Kill()
|
dead int32 // set by Kill()
|
||||||
rsm *rsm.RSM
|
rsm *rsm.RSM
|
||||||
|
gid tester.Tgid
|
||||||
|
|
||||||
|
// Your code here
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -37,17 +38,17 @@ func (kv *KVServer) Restore(data []byte) {
|
|||||||
// Your code here
|
// Your code here
|
||||||
}
|
}
|
||||||
|
|
||||||
func (kv *KVServer) Get(args *shardrpc.GetArgs, reply *rpc.GetReply) {
|
func (kv *KVServer) Get(args *rpc.GetArgs, reply *rpc.GetReply) {
|
||||||
// Your code here
|
// Your code here
|
||||||
}
|
}
|
||||||
|
|
||||||
func (kv *KVServer) Put(args *shardrpc.PutArgs, reply *rpc.PutReply) {
|
func (kv *KVServer) Put(args *rpc.PutArgs, reply *rpc.PutReply) {
|
||||||
// Your code here
|
// Your code here
|
||||||
}
|
}
|
||||||
|
|
||||||
// Freeze the specified shard (i.e., reject future Get/Puts for this
|
// Freeze the specified shard (i.e., reject future Get/Puts for this
|
||||||
// shard) and return the key/values stored in that shard.
|
// shard) and return the key/values stored in that shard.
|
||||||
func (kv *KVServer) Freeze(args *shardrpc.FreezeArgs, reply *shardrpc.FreezeReply) {
|
func (kv *KVServer) FreezeShard(args *shardrpc.FreezeShardArgs, reply *shardrpc.FreezeShardReply) {
|
||||||
// Your code here
|
// Your code here
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -57,7 +58,7 @@ func (kv *KVServer) InstallShard(args *shardrpc.InstallShardArgs, reply *shardrp
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Delete the specified shard.
|
// Delete the specified shard.
|
||||||
func (kv *KVServer) Delete(args *shardrpc.DeleteShardArgs, reply *shardrpc.DeleteShardReply) {
|
func (kv *KVServer) DeleteShard(args *shardrpc.DeleteShardArgs, reply *shardrpc.DeleteShardReply) {
|
||||||
// Your code here
|
// Your code here
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -86,9 +87,9 @@ func (kv *KVServer) killed() bool {
|
|||||||
func StartServerShardGrp(servers []*labrpc.ClientEnd, gid tester.Tgid, me int, persister *tester.Persister, maxraftstate int) []tester.IService {
|
func StartServerShardGrp(servers []*labrpc.ClientEnd, gid tester.Tgid, me int, persister *tester.Persister, maxraftstate int) []tester.IService {
|
||||||
// call labgob.Register on structures you want
|
// call labgob.Register on structures you want
|
||||||
// Go's RPC library to marshall/unmarshall.
|
// Go's RPC library to marshall/unmarshall.
|
||||||
labgob.Register(shardrpc.PutArgs{})
|
labgob.Register(rpc.PutArgs{})
|
||||||
labgob.Register(shardrpc.GetArgs{})
|
labgob.Register(rpc.GetArgs{})
|
||||||
labgob.Register(shardrpc.FreezeArgs{})
|
labgob.Register(shardrpc.FreezeShardArgs{})
|
||||||
labgob.Register(shardrpc.InstallShardArgs{})
|
labgob.Register(shardrpc.InstallShardArgs{})
|
||||||
labgob.Register(shardrpc.DeleteShardArgs{})
|
labgob.Register(shardrpc.DeleteShardArgs{})
|
||||||
labgob.Register(rsm.Op{})
|
labgob.Register(rsm.Op{})
|
||||||
@ -97,5 +98,6 @@ func StartServerShardGrp(servers []*labrpc.ClientEnd, gid tester.Tgid, me int, p
|
|||||||
kv.rsm = rsm.MakeRSM(servers, me, persister, maxraftstate, kv)
|
kv.rsm = rsm.MakeRSM(servers, me, persister, maxraftstate, kv)
|
||||||
|
|
||||||
// Your code here
|
// Your code here
|
||||||
|
|
||||||
return []tester.IService{kv, kv.rsm.Raft()}
|
return []tester.IService{kv, kv.rsm.Raft()}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -5,26 +5,12 @@ import (
|
|||||||
"6.5840/shardkv1/shardcfg"
|
"6.5840/shardkv1/shardcfg"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Same as Put in kvsrv1/rpc, but with a configuration number
|
type FreezeShardArgs struct {
|
||||||
type PutArgs struct {
|
|
||||||
Key string
|
|
||||||
Value string
|
|
||||||
Version rpc.Tversion
|
|
||||||
Num shardcfg.Tnum
|
|
||||||
}
|
|
||||||
|
|
||||||
// Same as Get in kvsrv1/rpc, but with a configuration number.
|
|
||||||
type GetArgs struct {
|
|
||||||
Key string
|
|
||||||
Num shardcfg.Tnum
|
|
||||||
}
|
|
||||||
|
|
||||||
type FreezeArgs struct {
|
|
||||||
Shard shardcfg.Tshid
|
Shard shardcfg.Tshid
|
||||||
Num shardcfg.Tnum
|
Num shardcfg.Tnum
|
||||||
}
|
}
|
||||||
|
|
||||||
type FreezeReply struct {
|
type FreezeShardReply struct {
|
||||||
State []byte
|
State []byte
|
||||||
Num shardcfg.Tnum
|
Num shardcfg.Tnum
|
||||||
Err rpc.Err
|
Err rpc.Err
|
||||||
|
|||||||
@ -1,7 +1,7 @@
|
|||||||
package shardkv
|
package shardkv
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"log"
|
//"log"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -9,7 +9,6 @@ import (
|
|||||||
"6.5840/kvtest1"
|
"6.5840/kvtest1"
|
||||||
"6.5840/shardkv1/shardcfg"
|
"6.5840/shardkv1/shardcfg"
|
||||||
"6.5840/shardkv1/shardctrler"
|
"6.5840/shardkv1/shardctrler"
|
||||||
"6.5840/shardkv1/shardctrler/param"
|
|
||||||
"6.5840/tester1"
|
"6.5840/tester1"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -28,7 +27,7 @@ func TestInitQuery5A(t *testing.T) {
|
|||||||
defer ts.Cleanup()
|
defer ts.Cleanup()
|
||||||
|
|
||||||
// Make a shard controller
|
// Make a shard controller
|
||||||
sck := shardctrler.MakeShardCtrler(ts.Config.MakeClient(), ts.leases)
|
sck := shardctrler.MakeShardCtrler(ts.Config.MakeClient())
|
||||||
|
|
||||||
// Make an empty shard configuration
|
// Make an empty shard configuration
|
||||||
scfg := shardcfg.MakeShardConfig()
|
scfg := shardcfg.MakeShardConfig()
|
||||||
@ -41,9 +40,9 @@ func TestInitQuery5A(t *testing.T) {
|
|||||||
sck.InitConfig(scfg)
|
sck.InitConfig(scfg)
|
||||||
|
|
||||||
// Read the initial configuration and check it
|
// Read the initial configuration and check it
|
||||||
cfg, v := sck.Query()
|
cfg := sck.Query()
|
||||||
if v != 1 || cfg.Num != 1 || cfg.Shards[0] != shardcfg.Gid1 {
|
if cfg.Num != 1 || cfg.Shards[0] != shardcfg.Gid1 {
|
||||||
ts.t.Fatalf("Static wrong %v %v", cfg, v)
|
ts.t.Fatalf("Static wrong %v", cfg)
|
||||||
}
|
}
|
||||||
cfg.CheckConfig(t, []tester.Tgid{shardcfg.Gid1})
|
cfg.CheckConfig(t, []tester.Tgid{shardcfg.Gid1})
|
||||||
}
|
}
|
||||||
@ -68,7 +67,7 @@ func TestStaticOneShardGroup5A(t *testing.T) {
|
|||||||
|
|
||||||
// disconnect raft leader of shardgrp and check that keys are
|
// disconnect raft leader of shardgrp and check that keys are
|
||||||
// still available
|
// still available
|
||||||
ts.disconnectClntFromLeader(ck.(*kvtest.TestClerk).Clnt, shardcfg.Gid1)
|
ts.disconnectClntFromLeader(shardcfg.Gid1)
|
||||||
|
|
||||||
for i := 0; i < n; i++ {
|
for i := 0; i < n; i++ {
|
||||||
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) // check the puts
|
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) // check the puts
|
||||||
@ -87,14 +86,14 @@ func TestJoinBasic5A(t *testing.T) {
|
|||||||
ka, va := ts.SpreadPuts(ck, NKEYS)
|
ka, va := ts.SpreadPuts(ck, NKEYS)
|
||||||
|
|
||||||
sck := ts.ShardCtrler()
|
sck := ts.ShardCtrler()
|
||||||
cfg, _ := sck.Query()
|
cfg := sck.Query()
|
||||||
|
|
||||||
gid2 := ts.newGid()
|
gid2 := ts.newGid()
|
||||||
if ok := ts.joinGroups(sck, []tester.Tgid{gid2}); !ok {
|
if ok := ts.joinGroups(sck, []tester.Tgid{gid2}); !ok {
|
||||||
ts.t.Fatalf("TestJoinBasic5A: joinGroups failed")
|
ts.t.Fatalf("TestJoinBasic5A: joinGroups failed")
|
||||||
}
|
}
|
||||||
|
|
||||||
cfg1, _ := sck.Query()
|
cfg1 := sck.Query()
|
||||||
if cfg.Num+1 != cfg1.Num {
|
if cfg.Num+1 != cfg1.Num {
|
||||||
ts.t.Fatalf("TestJoinBasic5A: wrong num %d expected %d ", cfg1.Num, cfg.Num+1)
|
ts.t.Fatalf("TestJoinBasic5A: wrong num %d expected %d ", cfg1.Num, cfg.Num+1)
|
||||||
}
|
}
|
||||||
@ -270,7 +269,7 @@ func TestShutdown5A(t *testing.T) {
|
|||||||
|
|
||||||
// Test that Gets for keys at groups that are alive
|
// Test that Gets for keys at groups that are alive
|
||||||
// return
|
// return
|
||||||
func TestProgressShutdown(t *testing.T) {
|
func TestProgressShutdown5A(t *testing.T) {
|
||||||
const (
|
const (
|
||||||
NJOIN = 4
|
NJOIN = 4
|
||||||
NSEC = 2
|
NSEC = 2
|
||||||
@ -299,7 +298,7 @@ func TestProgressShutdown(t *testing.T) {
|
|||||||
alive[g] = true
|
alive[g] = true
|
||||||
}
|
}
|
||||||
|
|
||||||
cfg, _ := sck.Query()
|
cfg := sck.Query()
|
||||||
|
|
||||||
ch := make(chan rpc.Err)
|
ch := make(chan rpc.Err)
|
||||||
go func() {
|
go func() {
|
||||||
@ -322,7 +321,7 @@ func TestProgressShutdown(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Test that Gets from a non-moving shard return quickly
|
// Test that Gets from a non-moving shard return quickly
|
||||||
func TestProgressJoin(t *testing.T) {
|
func TestProgressJoin5A(t *testing.T) {
|
||||||
const (
|
const (
|
||||||
NJOIN = 4
|
NJOIN = 4
|
||||||
NSEC = 4
|
NSEC = 4
|
||||||
@ -341,7 +340,7 @@ func TestProgressJoin(t *testing.T) {
|
|||||||
grps := ts.groups(NJOIN)
|
grps := ts.groups(NJOIN)
|
||||||
ts.joinGroups(sck, grps)
|
ts.joinGroups(sck, grps)
|
||||||
|
|
||||||
cfg, _ := sck.Query()
|
cfg := sck.Query()
|
||||||
newcfg := cfg.Copy()
|
newcfg := cfg.Copy()
|
||||||
newgid := tester.Tgid(NJOIN + 3)
|
newgid := tester.Tgid(NJOIN + 3)
|
||||||
if ok := newcfg.JoinBalance(map[tester.Tgid][]string{newgid: []string{"xxx"}}); !ok {
|
if ok := newcfg.JoinBalance(map[tester.Tgid][]string{newgid: []string{"xxx"}}); !ok {
|
||||||
@ -409,7 +408,7 @@ func TestProgressJoin(t *testing.T) {
|
|||||||
|
|
||||||
select {
|
select {
|
||||||
case cnt := <-ch1:
|
case cnt := <-ch1:
|
||||||
log.Printf("cnt %d", cnt)
|
//log.Printf("cnt %d", cnt)
|
||||||
if cnt < NCNT {
|
if cnt < NCNT {
|
||||||
ts.Fatalf("Two few gets finished %d; expected more than %d", cnt, NCNT)
|
ts.Fatalf("Two few gets finished %d; expected more than %d", cnt, NCNT)
|
||||||
}
|
}
|
||||||
@ -492,7 +491,7 @@ func TestJoinLeave5B(t *testing.T) {
|
|||||||
ka, va := ts.SpreadPuts(ck, NKEYS)
|
ka, va := ts.SpreadPuts(ck, NKEYS)
|
||||||
|
|
||||||
sck := ts.ShardCtrler()
|
sck := ts.ShardCtrler()
|
||||||
cfg, _ := sck.Query()
|
cfg := sck.Query()
|
||||||
|
|
||||||
ts.Group(gid1).Shutdown()
|
ts.Group(gid1).Shutdown()
|
||||||
|
|
||||||
@ -521,7 +520,7 @@ func TestJoinLeave5B(t *testing.T) {
|
|||||||
ts.Fatalf("Join didn't complete")
|
ts.Fatalf("Join didn't complete")
|
||||||
}
|
}
|
||||||
|
|
||||||
cfg1, _ := sck.Query()
|
cfg1 := sck.Query()
|
||||||
if cfg.Num+1 != cfg1.Num {
|
if cfg.Num+1 != cfg1.Num {
|
||||||
ts.t.Fatalf("wrong num %d expected %d ", cfg1.Num, cfg.Num+1)
|
ts.t.Fatalf("wrong num %d expected %d ", cfg1.Num, cfg.Num+1)
|
||||||
}
|
}
|
||||||
@ -569,140 +568,28 @@ func TestRecoverCtrler5B(t *testing.T) {
|
|||||||
ka, va := ts.SpreadPuts(ck, NKEYS)
|
ka, va := ts.SpreadPuts(ck, NKEYS)
|
||||||
|
|
||||||
for i := 0; i < NPARTITION; i++ {
|
for i := 0; i < NPARTITION; i++ {
|
||||||
ts.killCtrler(ck, gid, ka, va)
|
ts.partitionCtrler(ck, gid, ka, va)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test concurrent ctrlers fighting for leadership reliable
|
// Test concurrent ctrlers fighting for leadership reliable
|
||||||
func TestAcquireLockConcurrentReliable5C(t *testing.T) {
|
func TestConcurrentReliable5C(t *testing.T) {
|
||||||
ts := MakeTestLeases(t, "Test (5C): Concurent ctrlers acquiring leadership ...", true)
|
ts := MakeTestLeases(t, "Test (5C): Concurent ctrlers ...", true)
|
||||||
defer ts.Cleanup()
|
defer ts.Cleanup()
|
||||||
ts.setupKVService()
|
ts.setupKVService()
|
||||||
ck := ts.MakeClerk()
|
ck := ts.MakeClerk()
|
||||||
ka, va := ts.SpreadPuts(ck, NKEYS)
|
ka, va := ts.SpreadPuts(ck, NKEYS)
|
||||||
ts.electCtrler(ck, ka, va)
|
ts.concurCtrler(ck, ka, va)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test concurrent ctrlers fighting for leadership unreliable
|
// Test concurrent ctrlers fighting for leadership unreliable
|
||||||
func TestAcquireLockConcurrentUnreliable5C(t *testing.T) {
|
func TestAcquireLockConcurrentUnreliable5C(t *testing.T) {
|
||||||
ts := MakeTestLeases(t, "Test (5C): Concurent ctrlers acquiring leadership ...", false)
|
ts := MakeTestLeases(t, "Test (5C): Concurent ctrlers ...", false)
|
||||||
defer ts.Cleanup()
|
defer ts.Cleanup()
|
||||||
ts.setupKVService()
|
ts.setupKVService()
|
||||||
ck := ts.MakeClerk()
|
ck := ts.MakeClerk()
|
||||||
ka, va := ts.SpreadPuts(ck, NKEYS)
|
ka, va := ts.SpreadPuts(ck, NKEYS)
|
||||||
ts.electCtrler(ck, ka, va)
|
ts.concurCtrler(ck, ka, va)
|
||||||
}
|
|
||||||
|
|
||||||
// Test that ReleaseLock allows a new leader to start quickly
|
|
||||||
func TestLeaseBasicRelease5C(t *testing.T) {
|
|
||||||
ts := MakeTestLeases(t, "Test (5C): release lease ...", true)
|
|
||||||
defer ts.Cleanup()
|
|
||||||
ts.setupKVService()
|
|
||||||
|
|
||||||
sck0, clnt0 := ts.makeShardCtrlerClnt()
|
|
||||||
go func() {
|
|
||||||
sck0.InitController()
|
|
||||||
time.Sleep(200 * time.Millisecond)
|
|
||||||
sck0.ExitController()
|
|
||||||
}()
|
|
||||||
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
|
||||||
|
|
||||||
// start new controller
|
|
||||||
sck1, clnt1 := ts.makeShardCtrlerClnt()
|
|
||||||
ch := make(chan struct{})
|
|
||||||
go func() {
|
|
||||||
sck1.InitController()
|
|
||||||
time.Sleep(200 * time.Millisecond)
|
|
||||||
sck1.ExitController()
|
|
||||||
ch <- struct{}{}
|
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-time.After(1 * time.Second):
|
|
||||||
ts.Fatalf("Release didn't give up leadership")
|
|
||||||
case <-ch:
|
|
||||||
}
|
|
||||||
|
|
||||||
ts.Config.DeleteClient(clnt0)
|
|
||||||
ts.Config.DeleteClient(clnt1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test lease expiring
|
|
||||||
func TestLeaseBasicExpire5C(t *testing.T) {
|
|
||||||
ts := MakeTestLeases(t, "Test (5C): lease expiring ...", true)
|
|
||||||
defer ts.Cleanup()
|
|
||||||
ts.setupKVService()
|
|
||||||
|
|
||||||
sck0, clnt0 := ts.makeShardCtrlerClnt()
|
|
||||||
go func() {
|
|
||||||
sck0.InitController()
|
|
||||||
for {
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
|
|
||||||
// partition sck0 forever
|
|
||||||
clnt0.DisconnectAll()
|
|
||||||
|
|
||||||
// start new controller
|
|
||||||
sck1, clnt1 := ts.makeShardCtrlerClnt()
|
|
||||||
ch := make(chan struct{})
|
|
||||||
go func() {
|
|
||||||
sck1.InitController()
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
sck1.ExitController()
|
|
||||||
ch <- struct{}{}
|
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-time.After((param.LEASETIMESEC + 1) * time.Second):
|
|
||||||
ts.Fatalf("Lease didn't expire")
|
|
||||||
case <-ch:
|
|
||||||
}
|
|
||||||
|
|
||||||
ts.Config.DeleteClient(clnt0)
|
|
||||||
ts.Config.DeleteClient(clnt1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test lease is being extended
|
|
||||||
func TestLeaseBasicRefresh5C(t *testing.T) {
|
|
||||||
const LEADERSEC = 3
|
|
||||||
|
|
||||||
ts := MakeTestLeases(t, "Test (5C): lease refresh ...", true)
|
|
||||||
defer ts.Cleanup()
|
|
||||||
ts.setupKVService()
|
|
||||||
|
|
||||||
sck0, clnt0 := ts.makeShardCtrlerClnt()
|
|
||||||
go func() {
|
|
||||||
sck0.InitController()
|
|
||||||
time.Sleep(LEADERSEC * param.LEASETIMESEC * time.Second)
|
|
||||||
sck0.ExitController()
|
|
||||||
}()
|
|
||||||
|
|
||||||
// give sck0 time to become leader
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
|
|
||||||
// start new controller
|
|
||||||
sck1, clnt1 := ts.makeShardCtrlerClnt()
|
|
||||||
ch := make(chan struct{})
|
|
||||||
go func() {
|
|
||||||
sck1.InitController()
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
sck1.ExitController()
|
|
||||||
ch <- struct{}{}
|
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-time.After((LEADERSEC + param.LEASETIMESEC + 1) * time.Second):
|
|
||||||
case <-ch:
|
|
||||||
ts.Fatalf("Lease not refreshed")
|
|
||||||
}
|
|
||||||
|
|
||||||
ts.Config.DeleteClient(clnt0)
|
|
||||||
ts.Config.DeleteClient(clnt1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test if old leader is fenced off when reconnecting while it is in
|
// Test if old leader is fenced off when reconnecting while it is in
|
||||||
@ -710,6 +597,7 @@ func TestLeaseBasicRefresh5C(t *testing.T) {
|
|||||||
func TestPartitionControllerJoin5C(t *testing.T) {
|
func TestPartitionControllerJoin5C(t *testing.T) {
|
||||||
const (
|
const (
|
||||||
NSLEEP = 2
|
NSLEEP = 2
|
||||||
|
NSEC = 1
|
||||||
RAND = 1000
|
RAND = 1000
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -739,8 +627,8 @@ func TestPartitionControllerJoin5C(t *testing.T) {
|
|||||||
// partition sck
|
// partition sck
|
||||||
clnt.DisconnectAll()
|
clnt.DisconnectAll()
|
||||||
|
|
||||||
// wait until sck's lease expired before restarting shardgrp `ngid`
|
// wait a while before restarting shardgrp `ngid`
|
||||||
time.Sleep((param.LEASETIMESEC + 1) * time.Second)
|
time.Sleep(NSEC * time.Second)
|
||||||
|
|
||||||
ts.Group(ngid).StartServers()
|
ts.Group(ngid).StartServers()
|
||||||
|
|
||||||
@ -748,13 +636,11 @@ func TestPartitionControllerJoin5C(t *testing.T) {
|
|||||||
sck0 := ts.makeShardCtrler()
|
sck0 := ts.makeShardCtrler()
|
||||||
sck0.InitController()
|
sck0.InitController()
|
||||||
|
|
||||||
scfg, _ := sck0.Query()
|
scfg := sck0.Query()
|
||||||
if !scfg.IsMember(ngid) {
|
if !scfg.IsMember(ngid) {
|
||||||
t.Fatalf("Didn't recover gid %d", ngid)
|
t.Fatalf("Didn't recover gid %d", ngid)
|
||||||
}
|
}
|
||||||
|
|
||||||
sck0.ExitController()
|
|
||||||
|
|
||||||
// reconnect old controller, which shouldn't finish ChangeConfigTo
|
// reconnect old controller, which shouldn't finish ChangeConfigTo
|
||||||
clnt.ConnectAll()
|
clnt.ConnectAll()
|
||||||
|
|
||||||
@ -795,7 +681,7 @@ func partitionRecovery5C(t *testing.T, reliable bool, npart, nclnt int) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < npart; i++ {
|
for i := 0; i < npart; i++ {
|
||||||
ts.killCtrler(ck, gid, ka, va)
|
ts.partitionCtrler(ck, gid, ka, va)
|
||||||
}
|
}
|
||||||
|
|
||||||
if nclnt > 0 {
|
if nclnt > 0 {
|
||||||
@ -827,7 +713,7 @@ func TestPartitionRecoveryReliableClerks5C(t *testing.T) {
|
|||||||
|
|
||||||
func TestPartitionRecoveryUnreliableClerks5C(t *testing.T) {
|
func TestPartitionRecoveryUnreliableClerks5C(t *testing.T) {
|
||||||
const (
|
const (
|
||||||
NPARTITION = 5
|
NPARTITION = 3
|
||||||
)
|
)
|
||||||
partitionRecovery5C(t, false, NPARTITION, 5)
|
partitionRecovery5C(t, false, NPARTITION, 5)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -16,7 +16,6 @@ import (
|
|||||||
"6.5840/labrpc"
|
"6.5840/labrpc"
|
||||||
"6.5840/shardkv1/shardcfg"
|
"6.5840/shardkv1/shardcfg"
|
||||||
"6.5840/shardkv1/shardctrler"
|
"6.5840/shardkv1/shardctrler"
|
||||||
"6.5840/shardkv1/shardctrler/param"
|
|
||||||
"6.5840/shardkv1/shardgrp"
|
"6.5840/shardkv1/shardgrp"
|
||||||
"6.5840/tester1"
|
"6.5840/tester1"
|
||||||
)
|
)
|
||||||
@ -27,7 +26,7 @@ type Test struct {
|
|||||||
|
|
||||||
sck *shardctrler.ShardCtrler
|
sck *shardctrler.ShardCtrler
|
||||||
part string
|
part string
|
||||||
leases bool
|
partition bool
|
||||||
|
|
||||||
maxraftstate int
|
maxraftstate int
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
@ -41,11 +40,11 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// Setup kvserver for the shard controller and make the controller
|
// Setup kvserver for the shard controller and make the controller
|
||||||
func MakeTestMaxRaft(t *testing.T, part string, reliable, leases bool, maxraftstate int) *Test {
|
func MakeTestMaxRaft(t *testing.T, part string, reliable, partition bool, maxraftstate int) *Test {
|
||||||
ts := &Test{
|
ts := &Test{
|
||||||
ngid: shardcfg.Gid1 + 1, // Gid1 is in use
|
ngid: shardcfg.Gid1 + 1, // Gid1 is in use
|
||||||
t: t,
|
t: t,
|
||||||
leases: leases,
|
partition: partition,
|
||||||
maxraftstate: maxraftstate,
|
maxraftstate: maxraftstate,
|
||||||
}
|
}
|
||||||
cfg := tester.MakeConfig(t, 1, reliable, kvsrv.StartKVServer)
|
cfg := tester.MakeConfig(t, 1, reliable, kvsrv.StartKVServer)
|
||||||
@ -86,7 +85,7 @@ func (ts *Test) makeShardCtrler() *shardctrler.ShardCtrler {
|
|||||||
|
|
||||||
func (ts *Test) makeShardCtrlerClnt() (*shardctrler.ShardCtrler, *tester.Clnt) {
|
func (ts *Test) makeShardCtrlerClnt() (*shardctrler.ShardCtrler, *tester.Clnt) {
|
||||||
clnt := ts.Config.MakeClient()
|
clnt := ts.Config.MakeClient()
|
||||||
return shardctrler.MakeShardCtrler(clnt, ts.leases), clnt
|
return shardctrler.MakeShardCtrler(clnt), clnt
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ts *Test) makeKVClerk() *kvsrv.Clerk {
|
func (ts *Test) makeKVClerk() *kvsrv.Clerk {
|
||||||
@ -128,14 +127,14 @@ func (ts *Test) StartServerShardGrp(servers []*labrpc.ClientEnd, gid tester.Tgid
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (ts *Test) checkMember(sck *shardctrler.ShardCtrler, gid tester.Tgid) bool {
|
func (ts *Test) checkMember(sck *shardctrler.ShardCtrler, gid tester.Tgid) bool {
|
||||||
cfg, _ := sck.Query()
|
cfg := sck.Query()
|
||||||
ok := cfg.IsMember(gid)
|
ok := cfg.IsMember(gid)
|
||||||
return ok
|
return ok
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add group gid
|
// Add group gid
|
||||||
func (ts *Test) join(sck *shardctrler.ShardCtrler, gid tester.Tgid, srvs []string) {
|
func (ts *Test) join(sck *shardctrler.ShardCtrler, gid tester.Tgid, srvs []string) {
|
||||||
cfg, _ := sck.Query()
|
cfg := sck.Query()
|
||||||
newcfg := cfg.Copy()
|
newcfg := cfg.Copy()
|
||||||
ok := newcfg.JoinBalance(map[tester.Tgid][]string{gid: srvs})
|
ok := newcfg.JoinBalance(map[tester.Tgid][]string{gid: srvs})
|
||||||
if !ok {
|
if !ok {
|
||||||
@ -158,7 +157,7 @@ func (ts *Test) joinGroups(sck *shardctrler.ShardCtrler, gids []tester.Tgid) boo
|
|||||||
|
|
||||||
// Group gid leaves.
|
// Group gid leaves.
|
||||||
func (ts *Test) leave(sck *shardctrler.ShardCtrler, gid tester.Tgid) {
|
func (ts *Test) leave(sck *shardctrler.ShardCtrler, gid tester.Tgid) {
|
||||||
cfg, _ := sck.Query()
|
cfg := sck.Query()
|
||||||
newcfg := cfg.Copy()
|
newcfg := cfg.Copy()
|
||||||
ok := newcfg.LeaveBalance([]tester.Tgid{gid})
|
ok := newcfg.LeaveBalance([]tester.Tgid{gid})
|
||||||
if !ok {
|
if !ok {
|
||||||
@ -179,25 +178,12 @@ func (ts *Test) leaveGroups(sck *shardctrler.ShardCtrler, gids []tester.Tgid) bo
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ts *Test) disconnectRaftLeader(gid tester.Tgid) (int, string) {
|
func (ts *Test) disconnectClntFromLeader(gid tester.Tgid) int {
|
||||||
_, l := rsm.Leader(ts.Config, gid)
|
ok, l := rsm.Leader(ts.Config, gid)
|
||||||
g := ts.Group(gid)
|
if !ok {
|
||||||
ln := g.SrvName(l)
|
log.Fatalf("Leader failed")
|
||||||
g.DisconnectAll(l)
|
}
|
||||||
return l, ln
|
ts.Group(gid).ShutdownServer(l)
|
||||||
}
|
|
||||||
|
|
||||||
func (ts *Test) reconnectOldLeader(gid tester.Tgid, l int) {
|
|
||||||
g := ts.Group(gid)
|
|
||||||
g.ConnectOne(l)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ts *Test) disconnectClntFromLeader(clnt *tester.Clnt, gid tester.Tgid) int {
|
|
||||||
l, ln := ts.disconnectRaftLeader(gid)
|
|
||||||
p := ts.Group(gid).AllowServersExcept(l)
|
|
||||||
srvs := ts.Group(gid).SrvNamesTo(p)
|
|
||||||
clnt.Disconnect(ln)
|
|
||||||
clnt.ConnectTo(srvs)
|
|
||||||
return l
|
return l
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -252,15 +238,14 @@ func (ts *Test) checkShutdownSharding(down tester.Tgid, ka []string, va []string
|
|||||||
|
|
||||||
// Run one controler and then partition it after some time. Run
|
// Run one controler and then partition it after some time. Run
|
||||||
// another cntrler that must finish the first ctrler's unfinished
|
// another cntrler that must finish the first ctrler's unfinished
|
||||||
// shard moves. To ensure first ctrler is in a join/leave the test
|
// shard moves. To make it likely that first ctrler is in a join/leave
|
||||||
// shuts down shardgrp `gid`. After the second controller is done,
|
// the test shuts down shardgrp `gid`. After the second controller is
|
||||||
// heal the partition to test if Freeze,InstallShard, and Delete are
|
// done, heal the partition. partitionCtrler returns if recovery
|
||||||
// are fenced.
|
// happened.
|
||||||
func (ts *Test) killCtrler(ck kvtest.IKVClerk, gid tester.Tgid, ka, va []string) {
|
func (ts *Test) partitionCtrler(ck kvtest.IKVClerk, gid tester.Tgid, ka, va []string) {
|
||||||
const (
|
const (
|
||||||
NSLEEP = 2
|
RAND = 400
|
||||||
|
NSEC = 1
|
||||||
RAND = 1000
|
|
||||||
|
|
||||||
JOIN = 1
|
JOIN = 1
|
||||||
LEAVE = 2
|
LEAVE = 2
|
||||||
@ -269,7 +254,7 @@ func (ts *Test) killCtrler(ck kvtest.IKVClerk, gid tester.Tgid, ka, va []string)
|
|||||||
sck, clnt := ts.makeShardCtrlerClnt()
|
sck, clnt := ts.makeShardCtrlerClnt()
|
||||||
sck.InitController()
|
sck.InitController()
|
||||||
|
|
||||||
cfg, _ := ts.ShardCtrler().Query()
|
cfg := ts.ShardCtrler().Query()
|
||||||
num := cfg.Num
|
num := cfg.Num
|
||||||
|
|
||||||
state := 0
|
state := 0
|
||||||
@ -283,50 +268,57 @@ func (ts *Test) killCtrler(ck kvtest.IKVClerk, gid tester.Tgid, ka, va []string)
|
|||||||
state = LEAVE
|
state = LEAVE
|
||||||
ts.leaveGroups(sck, []tester.Tgid{ngid})
|
ts.leaveGroups(sck, []tester.Tgid{ngid})
|
||||||
} else {
|
} else {
|
||||||
//log.Printf("deposed")
|
//log.Printf("%v: deposed", sck.Id())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
// let sck run for a little while
|
||||||
|
time.Sleep(1000 * time.Millisecond)
|
||||||
|
|
||||||
r := rand.Int() % RAND
|
r := rand.Int() % RAND
|
||||||
d := time.Duration(r) * time.Millisecond
|
d := time.Duration(r) * time.Millisecond
|
||||||
time.Sleep(d)
|
time.Sleep(d)
|
||||||
|
|
||||||
//log.Printf("shutdown gid %d after %dms", gid, r)
|
//log.Printf("shutdown gid %d after %dms %v", gid, r, time.Now().Sub(t))
|
||||||
|
|
||||||
ts.Group(gid).Shutdown()
|
ts.Group(gid).Shutdown()
|
||||||
|
|
||||||
// sleep for a while to get the chance for the controler to get stuck
|
// sleep for a while to get sck stuck in join or leave, because
|
||||||
// in join or leave, because gid is down
|
// gid is down
|
||||||
time.Sleep(NSLEEP * time.Second)
|
time.Sleep(1000 * time.Millisecond)
|
||||||
|
|
||||||
//log.Printf("disconnect sck %v ngid %d num %d state %d", d, ngid, num, state)
|
//log.Printf("disconnect sck %v ngid %d num %d state %d", d, ngid, num, state)
|
||||||
|
|
||||||
// partition controller
|
// partition controller
|
||||||
clnt.DisconnectAll()
|
clnt.DisconnectAll()
|
||||||
|
|
||||||
if ts.leases {
|
if ts.partition {
|
||||||
// wait until sck's lease expired before restarting shardgrp `gid`
|
// wait a while before restarting shardgrp `gid`
|
||||||
time.Sleep((param.LEASETIMESEC + 1) * time.Second)
|
time.Sleep(NSEC * time.Second)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//log.Printf("startservers %v lease expired %t", time.Now().Sub(t), ts.leases)
|
||||||
|
|
||||||
ts.Group(gid).StartServers()
|
ts.Group(gid).StartServers()
|
||||||
|
|
||||||
// start new controler to pick up where sck left off
|
// start new controler to pick up where sck left off
|
||||||
sck0, clnt0 := ts.makeShardCtrlerClnt()
|
sck0, clnt0 := ts.makeShardCtrlerClnt()
|
||||||
|
|
||||||
sck0.InitController()
|
sck0.InitController()
|
||||||
cfg, _ = sck0.Query()
|
cfg = sck0.Query()
|
||||||
s := "join"
|
s := "join"
|
||||||
if state == LEAVE {
|
if state == LEAVE {
|
||||||
s = "leave"
|
s = "leave"
|
||||||
}
|
}
|
||||||
//log.Printf("%v cfg %v recovered %s", s, cfg, s)
|
|
||||||
|
|
||||||
if cfg.Num <= num {
|
if cfg.Num <= num {
|
||||||
ts.Fatalf("didn't recover; expected %d > %d", num, cfg.Num)
|
ts.Fatalf("didn't recover; expected %d > %d", num, cfg.Num)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//log.Printf("%v: recovered %v %v %v", sck0.Id(), time.Now().Sub(t), s, cfg)
|
||||||
|
|
||||||
present := cfg.IsMember(ngid)
|
present := cfg.IsMember(ngid)
|
||||||
if (state == JOIN && !present) || (state == LEAVE && present) {
|
if (state == JOIN && !present) || (state == LEAVE && present) {
|
||||||
ts.Fatalf("didn't recover %d correctly after %v", ngid, s)
|
ts.Fatalf("didn't recover %d correctly after %v", ngid, s)
|
||||||
@ -337,32 +329,30 @@ func (ts *Test) killCtrler(ck kvtest.IKVClerk, gid tester.Tgid, ka, va []string)
|
|||||||
ts.leaveGroups(sck0, []tester.Tgid{ngid})
|
ts.leaveGroups(sck0, []tester.Tgid{ngid})
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < len(ka); i++ {
|
if ts.partition {
|
||||||
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
|
|
||||||
}
|
|
||||||
|
|
||||||
sck0.ExitController()
|
|
||||||
|
|
||||||
if ts.leases {
|
|
||||||
//log.Printf("reconnect old controller")
|
|
||||||
|
|
||||||
// reconnect old controller, which should bail out, because
|
// reconnect old controller, which should bail out, because
|
||||||
// it has been superseded.
|
// it has been superseded.
|
||||||
clnt.ConnectAll()
|
clnt.ConnectAll()
|
||||||
|
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
//log.Printf("reconnected %v", time.Now().Sub(t))
|
||||||
|
|
||||||
for i := 0; i < len(ka); i++ {
|
for i := 0; i < len(ka); i++ {
|
||||||
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
|
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
//log.Printf("done get %v", time.Now().Sub(t))
|
||||||
|
|
||||||
ts.Config.DeleteClient(clnt)
|
ts.Config.DeleteClient(clnt)
|
||||||
ts.Config.DeleteClient(clnt0)
|
ts.Config.DeleteClient(clnt0)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ts *Test) electCtrler(ck kvtest.IKVClerk, ka, va []string) {
|
func (ts *Test) concurCtrler(ck kvtest.IKVClerk, ka, va []string) {
|
||||||
const (
|
const (
|
||||||
NSEC = 5
|
NSEC = 2
|
||||||
N = 4
|
N = 4
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -376,16 +366,16 @@ func (ts *Test) electCtrler(ck kvtest.IKVClerk, ka, va []string) {
|
|||||||
ngid := ts.newGid()
|
ngid := ts.newGid()
|
||||||
sck := ts.makeShardCtrler()
|
sck := ts.makeShardCtrler()
|
||||||
sck.InitController()
|
sck.InitController()
|
||||||
//log.Printf("%d(%p): join/leave %v", i, sck, ngid)
|
//log.Printf("%v: electCtrler %d join/leave %v", sck.Id(), i, ngid)
|
||||||
ts.joinGroups(sck, []tester.Tgid{ngid})
|
ts.joinGroups(sck, []tester.Tgid{ngid})
|
||||||
if ok := ts.checkMember(sck, ngid); ok {
|
if ok := ts.checkMember(sck, ngid); ok {
|
||||||
|
//log.Printf("%v: electCtrler %d leave %d", sck.Id(), i, ngid)
|
||||||
if ok := ts.leaveGroups(sck, []tester.Tgid{ngid}); !ok {
|
if ok := ts.leaveGroups(sck, []tester.Tgid{ngid}); !ok {
|
||||||
log.Fatalf("electCtrler: %d(%p): leave %v failed", i, sck, ngid)
|
//log.Printf("%v: electCtrler %d leave %v failed", sck.Id(), i, ngid)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
log.Fatalf("electCtrler: %d(%p): join %v failed", i, sck, ngid)
|
//log.Printf("%v: electCtrler %d join %v failed", sck.Id(), i, ngid)
|
||||||
}
|
}
|
||||||
sck.ExitController()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -399,6 +389,7 @@ func (ts *Test) electCtrler(ck kvtest.IKVClerk, ka, va []string) {
|
|||||||
for i := 0; i < N; i++ {
|
for i := 0; i < N; i++ {
|
||||||
ch <- struct{}{}
|
ch <- struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < len(ka); i++ {
|
for i := 0; i < len(ka); i++ {
|
||||||
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
|
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,169 +0,0 @@
|
|||||||
=== RUN TestAcquireLockConcurrentReliable5C
|
|
||||||
Test (5C): Concurent ctrlers acquiring leadership ... (reliable network)...
|
|
||||||
2025/03/20 09:18:33 PecUxIPV: acquire success 1 l 2.999731394s
|
|
||||||
2025/03/20 09:18:35 PecUxIPV: Release 3
|
|
||||||
2025/03/20 09:18:35 aKDBLFuF: acquire success 4 l 2.999504542s
|
|
||||||
2025/03/20 09:18:37 aKDBLFuF: Release 6
|
|
||||||
2025/03/20 09:18:37 HxhaFlAP: acquire success 7 l 2.999622621s
|
|
||||||
2025/03/20 09:18:39 HxhaFlAP: Release 9
|
|
||||||
2025/03/20 09:18:39 LpTmFCGC: acquire success 10 l 2.999747179s
|
|
||||||
2025/03/20 09:18:41 LpTmFCGC: Release 13
|
|
||||||
2025/03/20 09:18:41 klmldUQn: acquire success 14 l 2.999558604s
|
|
||||||
2025/03/20 09:18:43 klmldUQn: Release 17
|
|
||||||
2025/03/20 09:18:43 AWgiWKPZ: acquire success 18 l 2.999701903s
|
|
||||||
2025/03/20 09:18:46 AWgiWKPZ: Release 21
|
|
||||||
... Passed -- 16.4 1 2061 120
|
|
||||||
--- PASS: TestAcquireLockConcurrentReliable5C (16.38s)
|
|
||||||
=== RUN TestAcquireLockConcurrentUnreliable5C
|
|
||||||
Test (5C): Concurent ctrlers acquiring leadership ... (unreliable network)...
|
|
||||||
2025/03/20 09:19:00 xulPPlwd: acquire success 2 l 2.768860613s
|
|
||||||
2025/03/20 09:19:05 xulPPlwd: Release 6
|
|
||||||
2025/03/20 09:19:05 SGXgIJeR: acquire success 7 l 2.984694448s
|
|
||||||
2025/03/20 09:19:08 SGXgIJeR: Release 11
|
|
||||||
2025/03/20 09:19:08 kNvktGla: acquire success 12 l 2.986135242s
|
|
||||||
2025/03/20 09:19:13 kNvktGla: Release 17
|
|
||||||
2025/03/20 09:19:13 usGKuyeI: acquire success 18 l 2.97484218s
|
|
||||||
2025/03/20 09:19:19 usGKuyeI: Release 24
|
|
||||||
... Passed -- 38.4 1 2226 120
|
|
||||||
--- PASS: TestAcquireLockConcurrentUnreliable5C (38.37s)
|
|
||||||
=== RUN TestLeaseBasicRelease5C
|
|
||||||
Test (5C): release lease ... (reliable network)...
|
|
||||||
2025/03/20 09:19:25 fWllyjFs: acquire success 1 l 2.999778852s
|
|
||||||
2025/03/20 09:19:25 fWllyjFs: Release 2
|
|
||||||
2025/03/20 09:19:25 HqoctgYf: acquire success 3 l 2.999623311s
|
|
||||||
2025/03/20 09:19:26 HqoctgYf: Release 4
|
|
||||||
... Passed -- 0.4 1 17 0
|
|
||||||
--- PASS: TestLeaseBasicRelease5C (0.42s)
|
|
||||||
=== RUN TestLeaseBasicExpire5C
|
|
||||||
Test (5C): lease expiring ... (reliable network)...
|
|
||||||
2025/03/20 09:19:26 MgmIiwHw: acquire success 1 l 2.999622077s
|
|
||||||
2025/03/20 09:19:29 PviuBaqZ: acquire: MgmIiwHw lease expired -31.512117ms
|
|
||||||
2025/03/20 09:19:29 PviuBaqZ: acquire success 2 l 2.9996929s
|
|
||||||
2025/03/20 09:19:29 PviuBaqZ: Release 3
|
|
||||||
... Passed -- 3.1 1 81 0
|
|
||||||
--- PASS: TestLeaseBasicExpire5C (3.14s)
|
|
||||||
=== RUN TestLeaseBasicRefresh5C
|
|
||||||
Test (5C): lease refresh ... (reliable network)...
|
|
||||||
2025/03/20 09:19:29 CqhHcMdl: acquire success 1 l 2.999690343s
|
|
||||||
... Passed -- 7.1 1 144 0
|
|
||||||
--- PASS: TestLeaseBasicRefresh5C (7.10s)
|
|
||||||
=== RUN TestPartitionControllerJoin5C
|
|
||||||
Test (5C): partition controller in join... (reliable network)...
|
|
||||||
2025/03/20 09:19:38 CqhHcMdl: Release 9
|
|
||||||
2025/03/20 09:19:38 QykadXGi: acquire success 1 l 2.999763148s
|
|
||||||
2025/03/20 09:19:43 YWktoCTH: acquire: QykadXGi lease expired -2.003411436s
|
|
||||||
2025/03/20 09:19:43 YWktoCTH: acquire success 2 l 2.999580573s
|
|
||||||
2025/03/20 09:19:45 YWktoCTH: Release 4
|
|
||||||
2025/03/20 09:19:45 QykadXGi: refresher: exit expired -3.255782562s
|
|
||||||
... Passed -- 11.2 1 1011 120
|
|
||||||
--- PASS: TestPartitionControllerJoin5C (11.22s)
|
|
||||||
=== RUN TestPartitionRecoveryReliableNoClerk5C
|
|
||||||
Test (5C): controllers with leased leadership ... (reliable network)...
|
|
||||||
2025/03/20 09:19:50 nLqpPYYg: acquire success 1 l 2.999773699s
|
|
||||||
2025/03/20 09:19:56 Yauplngb: acquire: nLqpPYYg lease expired -1.030252686s
|
|
||||||
2025/03/20 09:19:56 Yauplngb: acquire success 4 l 2.999760357s
|
|
||||||
2025/03/20 09:19:58 Yauplngb: Release 7
|
|
||||||
2025/03/20 09:19:58 nLqpPYYg: refresher: exit expired -3.848348135s
|
|
||||||
2025/03/20 09:20:00 dsvADejV: acquire success 8 l 2.999675453s
|
|
||||||
2025/03/20 09:20:07 jdcPVdvf: acquire: dsvADejV lease expired -1.56610473s
|
|
||||||
2025/03/20 09:20:07 jdcPVdvf: acquire success 11 l 2.999839821s
|
|
||||||
2025/03/20 09:20:10 jdcPVdvf: Release 15
|
|
||||||
2025/03/20 09:20:10 dsvADejV: refresher: exit expired -4.604218577s
|
|
||||||
2025/03/20 09:20:12 vzVcVtTQ: acquire success 16 l 2.999743618s
|
|
||||||
2025/03/20 09:20:19 valCDRmB: acquire: vzVcVtTQ lease expired -1.988170854s
|
|
||||||
2025/03/20 09:20:19 valCDRmB: acquire success 19 l 2.999667662s
|
|
||||||
2025/03/20 09:20:22 valCDRmB: Release 22
|
|
||||||
2025/03/20 09:20:22 vzVcVtTQ: refresher: exit expired -4.943386258s
|
|
||||||
2025/03/20 09:20:23 RJYqYuLF: acquire success 23 l 2.999774783s
|
|
||||||
2025/03/20 09:20:30 KaeJpVvL: acquire: RJYqYuLF lease expired -1.222157296s
|
|
||||||
2025/03/20 09:20:30 KaeJpVvL: acquire success 26 l 2.999897268s
|
|
||||||
2025/03/20 09:20:33 KaeJpVvL: Release 30
|
|
||||||
2025/03/20 09:20:33 RJYqYuLF: refresher: exit expired -4.429889332s
|
|
||||||
2025/03/20 09:20:34 leVdobnP: acquire success 31 l 2.999770816s
|
|
||||||
2025/03/20 09:20:41 DFnmWean: acquire: leVdobnP lease expired -1.756292497s
|
|
||||||
2025/03/20 09:20:41 DFnmWean: acquire success 34 l 2.999905276s
|
|
||||||
2025/03/20 09:20:44 DFnmWean: Release 38
|
|
||||||
2025/03/20 09:20:44 leVdobnP: refresher: exit expired -4.84260629s
|
|
||||||
... Passed -- 59.3 1 5454 660
|
|
||||||
--- PASS: TestPartitionRecoveryReliableNoClerk5C (59.30s)
|
|
||||||
=== RUN TestPartitionRecoveryUnreliableNoClerk5C
|
|
||||||
Test (5C): controllers with leased leadership ... (unreliable network)...
|
|
||||||
2025/03/20 09:21:01 oBRWPJFn: acquire success 1 l 2.999668901s
|
|
||||||
2025/03/20 09:21:08 WCfEtCSF: acquire: oBRWPJFn lease expired -1.960469635s
|
|
||||||
2025/03/20 09:21:08 WCfEtCSF: acquire success 4 l 2.989064006s
|
|
||||||
2025/03/20 09:21:19 WCfEtCSF: Release 15
|
|
||||||
2025/03/20 09:21:20 oBRWPJFn: refresher: exit expired -13.623366094s
|
|
||||||
2025/03/20 09:21:25 BKOHUPgK: acquire success 16 l 2.974368151s
|
|
||||||
2025/03/20 09:21:32 dpZEDTAn: acquire: BKOHUPgK lease expired -1.266079689s
|
|
||||||
2025/03/20 09:21:32 dpZEDTAn: acquire success 19 l 2.986737971s
|
|
||||||
2025/03/20 09:21:42 dpZEDTAn: Release 29
|
|
||||||
2025/03/20 09:21:43 BKOHUPgK: refresher: exit expired -12.408069097s
|
|
||||||
2025/03/20 09:21:50 TiapOztE: acquire: dpZEDTAn lease expired -4.992859225s
|
|
||||||
2025/03/20 09:21:50 TiapOztE: acquire success 30 l 2.972701594s
|
|
||||||
2025/03/20 09:21:57 aDyCYcpR: acquire: TiapOztE lease expired -1.338848496s
|
|
||||||
2025/03/20 09:21:57 aDyCYcpR: acquire success 33 l 2.99686939s
|
|
||||||
2025/03/20 09:22:07 aDyCYcpR: Release 43
|
|
||||||
2025/03/20 09:22:07 TiapOztE: refresher: exit expired -12.147734461s
|
|
||||||
... Passed -- 86.9 1 4985 420
|
|
||||||
--- PASS: TestPartitionRecoveryUnreliableNoClerk5C (86.88s)
|
|
||||||
=== RUN TestPartitionRecoveryReliableClerks5C
|
|
||||||
Test (5C): controllers with leased leadership ... (reliable network)...
|
|
||||||
2025/03/20 09:22:13 vZrMwEsy: acquire success 1 l 2.999893567s
|
|
||||||
2025/03/20 09:22:20 AFHDpDYV: acquire: vZrMwEsy lease expired -1.657500925s
|
|
||||||
2025/03/20 09:22:20 AFHDpDYV: acquire success 4 l 2.999596975s
|
|
||||||
2025/03/20 09:22:22 AFHDpDYV: Release 6
|
|
||||||
2025/03/20 09:22:22 vZrMwEsy: refresher: exit expired -3.627083489s
|
|
||||||
2025/03/20 09:22:23 tserHLNb: acquire success 7 l 2.999932478s
|
|
||||||
2025/03/20 09:22:29 msIfUgIC: acquire: tserHLNb lease expired -1.13789373s
|
|
||||||
2025/03/20 09:22:29 msIfUgIC: acquire success 10 l 2.999755401s
|
|
||||||
2025/03/20 09:22:31 msIfUgIC: Release 12
|
|
||||||
2025/03/20 09:22:31 tserHLNb: refresher: exit expired -3.083945752s
|
|
||||||
2025/03/20 09:22:32 YLEIZyDn: acquire success 13 l 2.999940475s
|
|
||||||
2025/03/20 09:22:38 TIibzsMc: acquire: YLEIZyDn lease expired -1.017825561s
|
|
||||||
2025/03/20 09:22:38 TIibzsMc: acquire success 16 l 2.999907075s
|
|
||||||
2025/03/20 09:22:40 TIibzsMc: Release 18
|
|
||||||
2025/03/20 09:22:40 YLEIZyDn: refresher: exit expired -2.789136907s
|
|
||||||
2025/03/20 09:22:41 knOnYtxW: acquire success 19 l 2.999891429s
|
|
||||||
2025/03/20 09:22:47 KyiPMsgB: acquire: knOnYtxW lease expired -1.534324297s
|
|
||||||
2025/03/20 09:22:47 KyiPMsgB: acquire success 22 l 2.999822725s
|
|
||||||
2025/03/20 09:22:49 KyiPMsgB: Release 24
|
|
||||||
2025/03/20 09:22:49 knOnYtxW: refresher: exit expired -3.516354686s
|
|
||||||
2025/03/20 09:22:50 wHNCImkl: acquire success 25 l 2.999917928s
|
|
||||||
2025/03/20 09:22:56 CSBcxnyr: acquire: wHNCImkl lease expired -1.051161379s
|
|
||||||
2025/03/20 09:22:56 CSBcxnyr: acquire success 28 l 2.999745303s
|
|
||||||
2025/03/20 09:22:58 CSBcxnyr: Release 31
|
|
||||||
2025/03/20 09:22:58 wHNCImkl: refresher: exit expired -3.241024197s
|
|
||||||
... Passed -- 60.1 1 15934 5124
|
|
||||||
--- PASS: TestPartitionRecoveryReliableClerks5C (60.14s)
|
|
||||||
=== RUN TestPartitionRecoveryUnreliableClerks5C
|
|
||||||
Test (5C): controllers with leased leadership ... (unreliable network)...
|
|
||||||
2025/03/20 09:23:14 ydfNYYir: acquire success 1 l 2.871807366s
|
|
||||||
2025/03/20 09:23:21 KmfOaYym: acquire: ydfNYYir lease expired -1.96910688s
|
|
||||||
2025/03/20 09:23:21 KmfOaYym: acquire success 4 l 2.976357121s
|
|
||||||
2025/03/20 09:23:25 KmfOaYym: Release 9
|
|
||||||
2025/03/20 09:23:26 ydfNYYir: refresher: exit expired -6.960801287s
|
|
||||||
2025/03/20 09:23:27 XErxjiqb: acquire success 10 l 2.994288153s
|
|
||||||
2025/03/20 09:23:34 VQFBAKED: acquire: XErxjiqb lease expired -1.186993995s
|
|
||||||
2025/03/20 09:23:34 VQFBAKED: acquire success 14 l 2.978008397s
|
|
||||||
2025/03/20 09:23:40 VQFBAKED: Release 20
|
|
||||||
2025/03/20 09:23:40 XErxjiqb: refresher: exit expired -7.422563867s
|
|
||||||
2025/03/20 09:23:41 IqJHVjsW: acquire success 21 l 2.984528802s
|
|
||||||
2025/03/20 09:23:47 NAaIOMcb: acquire: IqJHVjsW lease expired -1.19246442s
|
|
||||||
2025/03/20 09:23:48 NAaIOMcb: acquire success 25 l 2.521727902s
|
|
||||||
2025/03/20 09:23:53 NAaIOMcb: Release 30
|
|
||||||
2025/03/20 09:23:53 IqJHVjsW: refresher: exit expired -7.130118022s
|
|
||||||
2025/03/20 09:23:54 pwTkolYO: acquire success 32 l 2.761741697s
|
|
||||||
2025/03/20 09:24:01 GAueeCFX: acquire: pwTkolYO lease expired -1.496813006s
|
|
||||||
2025/03/20 09:24:01 GAueeCFX: acquire success 34 l 2.977558093s
|
|
||||||
2025/03/20 09:24:06 GAueeCFX: Release 39
|
|
||||||
2025/03/20 09:24:06 pwTkolYO: refresher: exit expired -6.843949894s
|
|
||||||
2025/03/20 09:24:09 FIDtQSlF: acquire: GAueeCFX lease expired -15.508321ms
|
|
||||||
2025/03/20 09:24:09 FIDtQSlF: acquire success 40 l 2.998464382s
|
|
||||||
2025/03/20 09:24:16 wExaLSov: acquire: FIDtQSlF lease expired -1.874162487s
|
|
||||||
2025/03/20 09:24:16 wExaLSov: acquire success 43 l 2.860519358s
|
|
||||||
2025/03/20 09:24:21 wExaLSov: Release 48
|
|
||||||
2025/03/20 09:24:21 FIDtQSlF: refresher: exit expired -6.924846198s
|
|
||||||
... Passed -- 68.4 1 10469 1186
|
|
||||||
--- PASS: TestPartitionRecoveryUnreliableClerks5C (68.39s)
|
|
||||||
PASS
|
|
||||||
ok 6.5840/shardkv1 351.349s
|
|
||||||
@ -138,7 +138,7 @@ func (cfg *Config) End() {
|
|||||||
ops := atomic.LoadInt32(&cfg.ops) // number of clerk get/put/append calls
|
ops := atomic.LoadInt32(&cfg.ops) // number of clerk get/put/append calls
|
||||||
|
|
||||||
fmt.Printf(" ... Passed --")
|
fmt.Printf(" ... Passed --")
|
||||||
fmt.Printf(" %4.1f %d %5d %4d\n", t, npeers, nrpc, ops)
|
fmt.Printf(" time %4.1fs #peers %d #RPCs %5d #Ops %4d\n", t, npeers, nrpc, ops)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -299,19 +299,6 @@ func (sg *ServerGrp) MakePartition(l int) ([]int, []int) {
|
|||||||
return p1, p2
|
return p1, p2
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sg *ServerGrp) AllowServersExcept(l int) []int {
|
|
||||||
n := len(sg.srvs) - 1
|
|
||||||
p := make([]int, n)
|
|
||||||
j := 0
|
|
||||||
for i, _ := range sg.srvs {
|
|
||||||
if i != l {
|
|
||||||
p[j] = i
|
|
||||||
j++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return p
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sg *ServerGrp) Partition(p1 []int, p2 []int) {
|
func (sg *ServerGrp) Partition(p1 []int, p2 []int) {
|
||||||
//log.Printf("partition servers into: %v %v\n", p1, p2)
|
//log.Printf("partition servers into: %v %v\n", p1, p2)
|
||||||
for i := 0; i < len(p1); i++ {
|
for i := 0; i < len(p1); i++ {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user