Compare commits

...

10 Commits

Author SHA1 Message Date
9d4d24dc3e initial commit ? 2025-04-16 16:04:51 +02:00
Frans Kaashoek
fa6877b02d update 2025-04-07 19:39:55 -04:00
Frans Kaashoek
d21b9ffe74 update 2025-04-03 06:53:23 -04:00
Frans Kaashoek
059b435561 update 2025-04-02 07:13:48 -04:00
Frans Kaashoek
c96bad59a5 update 2025-04-02 06:59:12 -04:00
Frans Kaashoek
8743e4fb99 update 2025-04-02 06:53:36 -04:00
Frans Kaashoek
bb597de980 update 2025-04-02 06:49:54 -04:00
Frans Kaashoek
8f21c11cfc update 2025-04-01 18:04:28 -04:00
Frans Kaashoek
05dc50ab27 remove 2025-03-27 11:47:04 -04:00
Frans Kaashoek
7ba7e45595 x 2025-03-20 10:59:46 -04:00
16 changed files with 418 additions and 649 deletions

View File

@ -32,7 +32,7 @@ func main() {
func loadPlugin(filename string) (func(string, string) []mr.KeyValue, func(string, []string) string) {
p, err := plugin.Open(filename)
if err != nil {
log.Fatalf("cannot load plugin %v", filename)
log.Fatalf("cannot load plugin %v: %s", filename, err)
}
xmapf, err := p.Lookup("Map")
if err != nil {

View File

@ -5,11 +5,42 @@ import "net"
import "os"
import "net/rpc"
import "net/http"
import "sync"
type status uint8
const (
idle status = iota
progress
completed
)
type Coordinator struct {
// Your definitions here.
// Your definitions here.
mapJobStatus map[string]status
reduceJobStatus map[string]status
workerStatus map[int]status
intermediateFiles map[string][]string
nReduce int
workerCount int
m sync.Mutex
}
func (c *Coordinator) areMapJobsDone() bool {
done := true
for _, job := range c.mapJobStatus {
if job != completed {
done = false
}
}
return done
}
// Your code here -- RPC handlers for the worker to call.
@ -20,25 +51,73 @@ type Coordinator struct {
// the RPC argument and reply types are defined in rpc.go.
//
func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {
reply.Y = args.X + 1
return nil
reply.Y = args.X + 1
return nil
}
func (c *Coordinator) Register(args *RegisterArgs, reply *RegisterReply) error {
c.m.Lock()
defer c.m.Unlock()
reply.WorkerId = c.workerCount
c.workerCount += 1
c.workerStatus[reply.WorkerId] = idle
return nil
}
func (c *Coordinator) GetJob(args *GetJobArgs, reply *GetJobReply) error {
c.m.Lock()
c.m.Unlock()
if !c.areMapJobsDone() {
for filename, jobStatus := range c.mapJobStatus {
if jobStatus == idle {
var job MapJob
job.InputFile = filename
job.ReducerCount = c.nReduce
c.mapJobStatus[filename] = progress
reply.Finished = false
reply.JobType = Map
reply.MapJob = job
c.workerStatus[args.WorkerId] = progress
return nil
}
}
}
return nil
}
func (c *Coordinator) Finish(args *MapResult, reply *FinishReply) error {
c.m.Lock()
defer c.m.Unlock()
c.intermediateFiles[args.InputFile] = args.IntermediateFiles
c.mapJobStatus[args.InputFile] = completed
c.workerStatus[args.WorkerId] = idle
return nil
}
//
// start a thread that listens for RPCs from worker.go
//
func (c *Coordinator) server() {
rpc.Register(c)
rpc.HandleHTTP()
//l, e := net.Listen("tcp", ":1234")
sockname := coordinatorSock()
os.Remove(sockname)
l, e := net.Listen("unix", sockname)
if e != nil {
log.Fatal("listen error:", e)
}
go http.Serve(l, nil)
rpc.Register(c)
rpc.HandleHTTP()
//l, e := net.Listen("tcp", ":1234")
sockname := coordinatorSock()
os.Remove(sockname)
l, e := net.Listen("unix", sockname)
if e != nil {
log.Fatal("listen error:", e)
}
go http.Serve(l, nil)
}
//
@ -46,12 +125,12 @@ func (c *Coordinator) server() {
// if the entire job has finished.
//
func (c *Coordinator) Done() bool {
ret := false
ret := false
// Your code here.
// Your code here.
return ret
return ret
}
//
@ -60,11 +139,19 @@ func (c *Coordinator) Done() bool {
// nReduce is the number of reduce tasks to use.
//
func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{}
c := Coordinator{}
// Your code here.
// Your code here.
c.mapJobStatus = make(map[string]status)
c.intermediateFiles = make(map[string][]string)
c.workerStatus = make(map[int]status)
c.nReduce = nReduce
for _, v := range files {
c.mapJobStatus[v] = idle
c.intermediateFiles[v] = make([]string, nReduce)
}
c.server()
return &c
c.server()
return &c
}

View File

@ -15,22 +15,70 @@ import "strconv"
//
type ExampleArgs struct {
X int
X int
}
type ExampleReply struct {
Y int
Y int
}
// Add your RPC definitions here.
type JobType uint8
const (
Map JobType = iota
Reduce
)
type MapJob struct {
//Index int
InputFile string
ReducerCount int
//IntermediateFiles []string
}
type ReduceJob struct {
ReducerNumber int
IntermediateFiles []string
}
type MapResult struct {
InputFile string
IntermediateFiles []string
WorkerId int
}
type RegisterArgs struct {
// empty
}
type RegisterReply struct {
WorkerId int
}
type GetJobArgs struct {
WorkerId int
}
type GetJobReply struct {
JobType JobType
MapJob MapJob
ReduceJob ReduceJob
Finished bool
}
type FinishReply struct {
}
// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the coordinator.
// Can't use the current directory since
// Athena AFS doesn't support UNIX-domain sockets.
func coordinatorSock() string {
s := "/var/tmp/5840-mr-"
s += strconv.Itoa(os.Getuid())
return s
s := "/var/tmp/5840-mr-"
s += strconv.Itoa(os.Getuid())
return s
}

View File

@ -4,14 +4,16 @@ import "fmt"
import "log"
import "net/rpc"
import "hash/fnv"
import "os"
import "io"
import "encoding/json"
//
// Map functions return a slice of KeyValue.
//
type KeyValue struct {
Key string
Value string
Key string
Value string
}
//
@ -19,23 +21,91 @@ type KeyValue struct {
// task number for each KeyValue emitted by Map.
//
func ihash(key string) int {
h := fnv.New32a()
h.Write([]byte(key))
return int(h.Sum32() & 0x7fffffff)
h := fnv.New32a()
h.Write([]byte(key))
return int(h.Sum32() & 0x7fffffff)
}
var id int
//
// main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) {
// Your worker implementation here.
// Your worker implementation here.
// uncomment to send the Example RPC to the coordinator.
// CallExample()
registerReply := Register()
id = registerReply.WorkerId
done := false
for !done {
reply := GetJob(GetJobArgs{id})
if reply.Finished {
done = true
continue
}
if reply.JobType == Map {
doMap(mapf, reply.MapJob)
} else if reply.JobType == Reduce {
doReduce(reducef, reply.ReduceJob)
}
}
// uncomment to send the Example RPC to the coordinator.
// CallExample()
}
func doMap(mapf func(string, string) []KeyValue, job MapJob) {
file, err := os.Open(job.InputFile)
if err != nil {
log.Fatalf("[Error] Could not open file %s", job.InputFile)
}
content, err := io.ReadAll(file)
if err != nil {
log.Fatalf("[Error] Could not read file %s", job.InputFile)
}
file.Close()
kva := mapf(job.InputFile, string(content))
sort.Sort(ByKey(kva))
partitions := make([][]KeyValue, job.ReducerCount)
for _, v := range kva {
i := ihash(v.Key) % job.ReducerCount
partitions[i] = append(partitions[i], v)
}
intermediateFiles := make([]string, job.ReducerCount)
for i, v := range partitions {
filename := fmt.Sprintf("mr-%v-%v.txt", job.Index, i)
file, err := os.Open(filename);
if err != nil {
log.Fatalf("[Error] Could not open file %s", filename)
}
enc := json.NewEncoder(file)
for _, kv := range v {
err := enc.Encode(&kv)
if err != nil {
log.Fatalf("[Error] Could not write to file %s", filename)
}
}
intermediateFiles[i] = filename
file.Close()
}
Finish(MapResult{job.InputFile, intermediateFiles, id})
}
//
@ -45,26 +115,53 @@ func Worker(mapf func(string, string) []KeyValue,
//
func CallExample() {
// declare an argument structure.
args := ExampleArgs{}
// declare an argument structure.
args := ExampleArgs{}
// fill in the argument(s).
args.X = 99
// fill in the argument(s).
args.X = 99
// declare a reply structure.
reply := ExampleReply{}
// declare a reply structure.
reply := ExampleReply{}
// send the RPC request, wait for the reply.
// the "Coordinator.Example" tells the
// receiving server that we'd like to call
// the Example() method of struct Coordinator.
ok := call("Coordinator.Example", &args, &reply)
if ok {
// reply.Y should be 100.
fmt.Printf("reply.Y %v\n", reply.Y)
} else {
fmt.Printf("call failed!\n")
}
// send the RPC request, wait for the reply.
// the "Coordinator.Example" tells the
// receiving server that we'd like to call
// the Example() method of struct Coordinator.
ok := call("Coordinator.Example", &args, &reply)
if ok {
// reply.Y should be 100.
fmt.Printf("reply.Y %v\n", reply.Y)
} else {
fmt.Printf("call failed!\n")
}
call("Coordinatior.Wrong", &args, &reply);
}
func Register() RegisterReply {
var args RegisterArgs
var reply RegisterReply
call("Coordinator.Register", &args, &reply)
return reply
}
func GetJob(args GetJobArgs) GetJobReply {
var reply GetJobReply
call("Coordinator.GetJob", &args, &reply)
return reply
}
func Finish(args MapResult) FinishReply {
var reply FinishReply
call("Coordinator.Finish", &args, &reply)
return reply
}
//
@ -73,19 +170,19 @@ func CallExample() {
// returns false if something goes wrong.
//
func call(rpcname string, args interface{}, reply interface{}) bool {
// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
sockname := coordinatorSock()
c, err := rpc.DialHTTP("unix", sockname)
if err != nil {
log.Fatal("dialing:", err)
}
defer c.Close()
// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
sockname := coordinatorSock()
c, err := rpc.DialHTTP("unix", sockname)
if err != nil {
log.Fatal("dialing:", err)
}
defer c.Close()
err = c.Call(rpcname, args, reply)
if err == nil {
return true
}
err = c.Call(rpcname, args, reply)
if err == nil {
return true
}
fmt.Println(err)
return false
fmt.Println(err)
return false
}

View File

@ -1,31 +0,0 @@
package lock
import (
"6.5840/kvsrv1"
"6.5840/kvsrv1/rpc"
"6.5840/shardkv1/shardctrler/param"
)
type Lock struct {
ck *kvsrv.Clerk
}
// Use l as the key to store the "lock state" (you would have to decide
// precisely what the lock state is).
func MakeLock(ck kvtest.IKVClerk, l string) *Lock {
lk := &Lock{ck: ck.(*kvsrv.Clerk)}
// You may add code here
return lk
}
func (lk *Lock) Acquire() {
// You may add code here.
}
func (lk *Lock) Release() {
// You may add code here.
}

View File

@ -1,89 +0,0 @@
package lock
import (
"fmt"
// "log"
"strconv"
"testing"
"time"
"6.5840/kvsrv1"
"6.5840/kvsrv1/rpc"
"6.5840/kvtest1"
)
const (
NACQUIRE = 10
NCLNT = 10
NSEC = 2
)
func oneClient(t *testing.T, me int, ck kvtest.IKVClerk, done chan struct{}) kvtest.ClntRes {
lk := MakeLock(ck, "l")
ck.Put("l0", "", 0)
for i := 1; true; i++ {
select {
case <-done:
return kvtest.ClntRes{i, 0}
default:
lk.Acquire()
// log.Printf("%d: acquired lock", me)
b := strconv.Itoa(me)
val, ver, err := ck.Get("l0")
if err == rpc.OK {
if val != "" {
t.Fatalf("%d: two clients acquired lock %v", me, val)
}
} else {
t.Fatalf("%d: get failed %v", me, err)
}
err = ck.Put("l0", string(b), ver)
if !(err == rpc.OK || err == rpc.ErrMaybe) {
t.Fatalf("%d: put failed %v", me, err)
}
time.Sleep(10 * time.Millisecond)
err = ck.Put("l0", "", ver+1)
if !(err == rpc.OK || err == rpc.ErrMaybe) {
t.Fatalf("%d: put failed %v", me, err)
}
// log.Printf("%d: release lock", me)
lk.Release()
}
}
return kvtest.ClntRes{}
}
// Run test clients
func runClients(t *testing.T, nclnt int, reliable bool) {
ts := kvsrv.MakeTestKV(t, reliable)
defer ts.Cleanup()
ts.Begin(fmt.Sprintf("Test: %d lock clients", nclnt))
ts.SpawnClientsAndWait(nclnt, NSEC*time.Second, func(me int, myck kvtest.IKVClerk, done chan struct{}) kvtest.ClntRes {
return oneClient(t, me, myck, done)
})
}
func TestOneClientReliable(t *testing.T) {
runClients(t, 1, true)
}
func TestManyClientsReliable(t *testing.T) {
runClients(t, NCLNT, true)
}
func TestOneClientUnreliable(t *testing.T) {
runClients(t, 1, false)
}
func TestManyClientsUnreliable(t *testing.T) {
runClients(t, NCLNT, false)
}

View File

@ -1,7 +0,0 @@
package param
const (
// Length of time that a lease is valid. If a client doesn't
// refresh the lease within this time, the lease will expire.
LEASETIMESEC = 3
)

View File

@ -6,10 +6,7 @@ package shardctrler
import (
"sync/atomic"
"6.5840/kvsrv1"
"6.5840/kvsrv1/rpc"
"6.5840/kvtest1"
"6.5840/shardkv1/shardcfg"
"6.5840/tester1"
@ -22,14 +19,13 @@ type ShardCtrler struct {
kvtest.IKVClerk
killed int32 // set by Kill()
leases bool
// Your data here.
}
// Make a ShardCltler, which stores its state in a kvsrv.
func MakeShardCtrler(clnt *tester.Clnt, leases bool) *ShardCtrler {
sck := &ShardCtrler{clnt: clnt, leases: leases}
func MakeShardCtrler(clnt *tester.Clnt) *ShardCtrler {
sck := &ShardCtrler{clnt: clnt}
srv := tester.ServerName(tester.GRP0, 0)
sck.IKVClerk = kvsrv.MakeClerk(clnt, srv)
// Your code here.
@ -38,20 +34,15 @@ func MakeShardCtrler(clnt *tester.Clnt, leases bool) *ShardCtrler {
// The tester calls InitController() before starting a new
// controller. In part A, this method doesn't need to do anything. In
// B and C, this method implements recovery (part B) and uses a lock
// to become leader (part C).
// B and C, this method implements recovery.
func (sck *ShardCtrler) InitController() {
}
// The tester calls ExitController to exit a controller. In part B and
// C, release lock.
func (sck *ShardCtrler) ExitController() {
}
// Called once by the tester to supply the first configuration. You
// can marshal ShardConfig into a string using shardcfg.String(), and
// then Put it in the kvsrv for the controller at version 0. You can
// pick the key to name the configuration.
// pick the key to name the configuration. The initial configuration
// lists shardgrp shardcfg.Gid1 for all shards.
func (sck *ShardCtrler) InitConfig(cfg *shardcfg.ShardConfig) {
// Your code here
}
@ -61,25 +52,13 @@ func (sck *ShardCtrler) InitConfig(cfg *shardcfg.ShardConfig) {
// changes the configuration it may be superseded by another
// controller.
func (sck *ShardCtrler) ChangeConfigTo(new *shardcfg.ShardConfig) {
return
}
// Tester "kills" shardctrler by calling Kill(). For your
// convenience, we also supply isKilled() method to test killed in
// loops.
func (sck *ShardCtrler) Kill() {
atomic.StoreInt32(&sck.killed, 1)
}
func (sck *ShardCtrler) isKilled() bool {
z := atomic.LoadInt32(&sck.killed)
return z == 1
}
// Return the current configuration and its version number
func (sck *ShardCtrler) Query() (*shardcfg.ShardConfig, rpc.Tversion) {
// Your code here.
return nil, 0
}
// Return the current configuration
func (sck *ShardCtrler) Query() *shardcfg.ShardConfig {
// Your code here.
return nil
}

View File

@ -2,7 +2,6 @@ package shardgrp
import (
"6.5840/kvsrv1/rpc"
"6.5840/shardkv1/shardcfg"
"6.5840/tester1"
@ -11,7 +10,7 @@ import (
type Clerk struct {
clnt *tester.Clnt
servers []string
leader int // last successful leader (index into servers[])
// You will have to modify this struct.
}
func MakeClerk(clnt *tester.Clnt, servers []string) *Clerk {
@ -19,24 +18,27 @@ func MakeClerk(clnt *tester.Clnt, servers []string) *Clerk {
return ck
}
func (ck *Clerk) Get(key string, n shardcfg.Tnum) (string, rpc.Tversion, rpc.Err) {
func (ck *Clerk) Get(key string) (string, rpc.Tversion, rpc.Err) {
// Your code here
return "", 0, ""
}
func (ck *Clerk) Put(key string, value string, version rpc.Tversion, n shardcfg.Tnum) (bool, rpc.Err) {
func (ck *Clerk) Put(key string, value string, version rpc.Tversion) rpc.Err {
// Your code here
return false, ""
return ""
}
func (ck *Clerk) Freeze(s shardcfg.Tshid, num shardcfg.Tnum) ([]byte, rpc.Err) {
func (ck *Clerk) FreezeShard(s shardcfg.Tshid, num shardcfg.Tnum) ([]byte, rpc.Err) {
// Your code here
return nil, ""
}
func (ck *Clerk) InstallShard(s shardcfg.Tshid, state []byte, num shardcfg.Tnum) rpc.Err {
// Your code here
return ""
}
func (ck *Clerk) Delete(s shardcfg.Tshid, num shardcfg.Tnum) rpc.Err {
func (ck *Clerk) DeleteShard(s shardcfg.Tshid, num shardcfg.Tnum) rpc.Err {
// Your code here
return ""
}

View File

@ -14,11 +14,12 @@ import (
type KVServer struct {
gid tester.Tgid
me int
dead int32 // set by Kill()
rsm *rsm.RSM
gid tester.Tgid
// Your code here
}
@ -37,17 +38,17 @@ func (kv *KVServer) Restore(data []byte) {
// Your code here
}
func (kv *KVServer) Get(args *shardrpc.GetArgs, reply *rpc.GetReply) {
func (kv *KVServer) Get(args *rpc.GetArgs, reply *rpc.GetReply) {
// Your code here
}
func (kv *KVServer) Put(args *shardrpc.PutArgs, reply *rpc.PutReply) {
func (kv *KVServer) Put(args *rpc.PutArgs, reply *rpc.PutReply) {
// Your code here
}
// Freeze the specified shard (i.e., reject future Get/Puts for this
// shard) and return the key/values stored in that shard.
func (kv *KVServer) Freeze(args *shardrpc.FreezeArgs, reply *shardrpc.FreezeReply) {
func (kv *KVServer) FreezeShard(args *shardrpc.FreezeShardArgs, reply *shardrpc.FreezeShardReply) {
// Your code here
}
@ -57,7 +58,7 @@ func (kv *KVServer) InstallShard(args *shardrpc.InstallShardArgs, reply *shardrp
}
// Delete the specified shard.
func (kv *KVServer) Delete(args *shardrpc.DeleteShardArgs, reply *shardrpc.DeleteShardReply) {
func (kv *KVServer) DeleteShard(args *shardrpc.DeleteShardArgs, reply *shardrpc.DeleteShardReply) {
// Your code here
}
@ -86,9 +87,9 @@ func (kv *KVServer) killed() bool {
func StartServerShardGrp(servers []*labrpc.ClientEnd, gid tester.Tgid, me int, persister *tester.Persister, maxraftstate int) []tester.IService {
// call labgob.Register on structures you want
// Go's RPC library to marshall/unmarshall.
labgob.Register(shardrpc.PutArgs{})
labgob.Register(shardrpc.GetArgs{})
labgob.Register(shardrpc.FreezeArgs{})
labgob.Register(rpc.PutArgs{})
labgob.Register(rpc.GetArgs{})
labgob.Register(shardrpc.FreezeShardArgs{})
labgob.Register(shardrpc.InstallShardArgs{})
labgob.Register(shardrpc.DeleteShardArgs{})
labgob.Register(rsm.Op{})
@ -97,5 +98,6 @@ func StartServerShardGrp(servers []*labrpc.ClientEnd, gid tester.Tgid, me int, p
kv.rsm = rsm.MakeRSM(servers, me, persister, maxraftstate, kv)
// Your code here
return []tester.IService{kv, kv.rsm.Raft()}
}

View File

@ -5,26 +5,12 @@ import (
"6.5840/shardkv1/shardcfg"
)
// Same as Put in kvsrv1/rpc, but with a configuration number
type PutArgs struct {
Key string
Value string
Version rpc.Tversion
Num shardcfg.Tnum
}
// Same as Get in kvsrv1/rpc, but with a configuration number.
type GetArgs struct {
Key string
Num shardcfg.Tnum
}
type FreezeArgs struct {
type FreezeShardArgs struct {
Shard shardcfg.Tshid
Num shardcfg.Tnum
}
type FreezeReply struct {
type FreezeShardReply struct {
State []byte
Num shardcfg.Tnum
Err rpc.Err

View File

@ -1,7 +1,7 @@
package shardkv
import (
"log"
//"log"
"testing"
"time"
@ -9,7 +9,6 @@ import (
"6.5840/kvtest1"
"6.5840/shardkv1/shardcfg"
"6.5840/shardkv1/shardctrler"
"6.5840/shardkv1/shardctrler/param"
"6.5840/tester1"
)
@ -28,7 +27,7 @@ func TestInitQuery5A(t *testing.T) {
defer ts.Cleanup()
// Make a shard controller
sck := shardctrler.MakeShardCtrler(ts.Config.MakeClient(), ts.leases)
sck := shardctrler.MakeShardCtrler(ts.Config.MakeClient())
// Make an empty shard configuration
scfg := shardcfg.MakeShardConfig()
@ -41,9 +40,9 @@ func TestInitQuery5A(t *testing.T) {
sck.InitConfig(scfg)
// Read the initial configuration and check it
cfg, v := sck.Query()
if v != 1 || cfg.Num != 1 || cfg.Shards[0] != shardcfg.Gid1 {
ts.t.Fatalf("Static wrong %v %v", cfg, v)
cfg := sck.Query()
if cfg.Num != 1 || cfg.Shards[0] != shardcfg.Gid1 {
ts.t.Fatalf("Static wrong %v", cfg)
}
cfg.CheckConfig(t, []tester.Tgid{shardcfg.Gid1})
}
@ -68,7 +67,7 @@ func TestStaticOneShardGroup5A(t *testing.T) {
// disconnect raft leader of shardgrp and check that keys are
// still available
ts.disconnectClntFromLeader(ck.(*kvtest.TestClerk).Clnt, shardcfg.Gid1)
ts.disconnectClntFromLeader(shardcfg.Gid1)
for i := 0; i < n; i++ {
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) // check the puts
@ -87,14 +86,14 @@ func TestJoinBasic5A(t *testing.T) {
ka, va := ts.SpreadPuts(ck, NKEYS)
sck := ts.ShardCtrler()
cfg, _ := sck.Query()
cfg := sck.Query()
gid2 := ts.newGid()
if ok := ts.joinGroups(sck, []tester.Tgid{gid2}); !ok {
ts.t.Fatalf("TestJoinBasic5A: joinGroups failed")
}
cfg1, _ := sck.Query()
cfg1 := sck.Query()
if cfg.Num+1 != cfg1.Num {
ts.t.Fatalf("TestJoinBasic5A: wrong num %d expected %d ", cfg1.Num, cfg.Num+1)
}
@ -270,7 +269,7 @@ func TestShutdown5A(t *testing.T) {
// Test that Gets for keys at groups that are alive
// return
func TestProgressShutdown(t *testing.T) {
func TestProgressShutdown5A(t *testing.T) {
const (
NJOIN = 4
NSEC = 2
@ -299,7 +298,7 @@ func TestProgressShutdown(t *testing.T) {
alive[g] = true
}
cfg, _ := sck.Query()
cfg := sck.Query()
ch := make(chan rpc.Err)
go func() {
@ -322,7 +321,7 @@ func TestProgressShutdown(t *testing.T) {
}
// Test that Gets from a non-moving shard return quickly
func TestProgressJoin(t *testing.T) {
func TestProgressJoin5A(t *testing.T) {
const (
NJOIN = 4
NSEC = 4
@ -341,7 +340,7 @@ func TestProgressJoin(t *testing.T) {
grps := ts.groups(NJOIN)
ts.joinGroups(sck, grps)
cfg, _ := sck.Query()
cfg := sck.Query()
newcfg := cfg.Copy()
newgid := tester.Tgid(NJOIN + 3)
if ok := newcfg.JoinBalance(map[tester.Tgid][]string{newgid: []string{"xxx"}}); !ok {
@ -409,7 +408,7 @@ func TestProgressJoin(t *testing.T) {
select {
case cnt := <-ch1:
log.Printf("cnt %d", cnt)
//log.Printf("cnt %d", cnt)
if cnt < NCNT {
ts.Fatalf("Two few gets finished %d; expected more than %d", cnt, NCNT)
}
@ -492,7 +491,7 @@ func TestJoinLeave5B(t *testing.T) {
ka, va := ts.SpreadPuts(ck, NKEYS)
sck := ts.ShardCtrler()
cfg, _ := sck.Query()
cfg := sck.Query()
ts.Group(gid1).Shutdown()
@ -521,7 +520,7 @@ func TestJoinLeave5B(t *testing.T) {
ts.Fatalf("Join didn't complete")
}
cfg1, _ := sck.Query()
cfg1 := sck.Query()
if cfg.Num+1 != cfg1.Num {
ts.t.Fatalf("wrong num %d expected %d ", cfg1.Num, cfg.Num+1)
}
@ -569,140 +568,28 @@ func TestRecoverCtrler5B(t *testing.T) {
ka, va := ts.SpreadPuts(ck, NKEYS)
for i := 0; i < NPARTITION; i++ {
ts.killCtrler(ck, gid, ka, va)
ts.partitionCtrler(ck, gid, ka, va)
}
}
// Test concurrent ctrlers fighting for leadership reliable
func TestAcquireLockConcurrentReliable5C(t *testing.T) {
ts := MakeTestLeases(t, "Test (5C): Concurent ctrlers acquiring leadership ...", true)
func TestConcurrentReliable5C(t *testing.T) {
ts := MakeTestLeases(t, "Test (5C): Concurent ctrlers ...", true)
defer ts.Cleanup()
ts.setupKVService()
ck := ts.MakeClerk()
ka, va := ts.SpreadPuts(ck, NKEYS)
ts.electCtrler(ck, ka, va)
ts.concurCtrler(ck, ka, va)
}
// Test concurrent ctrlers fighting for leadership unreliable
func TestAcquireLockConcurrentUnreliable5C(t *testing.T) {
ts := MakeTestLeases(t, "Test (5C): Concurent ctrlers acquiring leadership ...", false)
ts := MakeTestLeases(t, "Test (5C): Concurent ctrlers ...", false)
defer ts.Cleanup()
ts.setupKVService()
ck := ts.MakeClerk()
ka, va := ts.SpreadPuts(ck, NKEYS)
ts.electCtrler(ck, ka, va)
}
// Test that ReleaseLock allows a new leader to start quickly
func TestLeaseBasicRelease5C(t *testing.T) {
ts := MakeTestLeases(t, "Test (5C): release lease ...", true)
defer ts.Cleanup()
ts.setupKVService()
sck0, clnt0 := ts.makeShardCtrlerClnt()
go func() {
sck0.InitController()
time.Sleep(200 * time.Millisecond)
sck0.ExitController()
}()
time.Sleep(10 * time.Millisecond)
// start new controller
sck1, clnt1 := ts.makeShardCtrlerClnt()
ch := make(chan struct{})
go func() {
sck1.InitController()
time.Sleep(200 * time.Millisecond)
sck1.ExitController()
ch <- struct{}{}
}()
select {
case <-time.After(1 * time.Second):
ts.Fatalf("Release didn't give up leadership")
case <-ch:
}
ts.Config.DeleteClient(clnt0)
ts.Config.DeleteClient(clnt1)
}
// Test lease expiring
func TestLeaseBasicExpire5C(t *testing.T) {
ts := MakeTestLeases(t, "Test (5C): lease expiring ...", true)
defer ts.Cleanup()
ts.setupKVService()
sck0, clnt0 := ts.makeShardCtrlerClnt()
go func() {
sck0.InitController()
for {
time.Sleep(10 * time.Millisecond)
}
}()
time.Sleep(100 * time.Millisecond)
// partition sck0 forever
clnt0.DisconnectAll()
// start new controller
sck1, clnt1 := ts.makeShardCtrlerClnt()
ch := make(chan struct{})
go func() {
sck1.InitController()
time.Sleep(100 * time.Millisecond)
sck1.ExitController()
ch <- struct{}{}
}()
select {
case <-time.After((param.LEASETIMESEC + 1) * time.Second):
ts.Fatalf("Lease didn't expire")
case <-ch:
}
ts.Config.DeleteClient(clnt0)
ts.Config.DeleteClient(clnt1)
}
// Test lease is being extended
func TestLeaseBasicRefresh5C(t *testing.T) {
const LEADERSEC = 3
ts := MakeTestLeases(t, "Test (5C): lease refresh ...", true)
defer ts.Cleanup()
ts.setupKVService()
sck0, clnt0 := ts.makeShardCtrlerClnt()
go func() {
sck0.InitController()
time.Sleep(LEADERSEC * param.LEASETIMESEC * time.Second)
sck0.ExitController()
}()
// give sck0 time to become leader
time.Sleep(100 * time.Millisecond)
// start new controller
sck1, clnt1 := ts.makeShardCtrlerClnt()
ch := make(chan struct{})
go func() {
sck1.InitController()
time.Sleep(100 * time.Millisecond)
sck1.ExitController()
ch <- struct{}{}
}()
select {
case <-time.After((LEADERSEC + param.LEASETIMESEC + 1) * time.Second):
case <-ch:
ts.Fatalf("Lease not refreshed")
}
ts.Config.DeleteClient(clnt0)
ts.Config.DeleteClient(clnt1)
ts.concurCtrler(ck, ka, va)
}
// Test if old leader is fenced off when reconnecting while it is in
@ -710,6 +597,7 @@ func TestLeaseBasicRefresh5C(t *testing.T) {
func TestPartitionControllerJoin5C(t *testing.T) {
const (
NSLEEP = 2
NSEC = 1
RAND = 1000
)
@ -739,8 +627,8 @@ func TestPartitionControllerJoin5C(t *testing.T) {
// partition sck
clnt.DisconnectAll()
// wait until sck's lease expired before restarting shardgrp `ngid`
time.Sleep((param.LEASETIMESEC + 1) * time.Second)
// wait a while before restarting shardgrp `ngid`
time.Sleep(NSEC * time.Second)
ts.Group(ngid).StartServers()
@ -748,13 +636,11 @@ func TestPartitionControllerJoin5C(t *testing.T) {
sck0 := ts.makeShardCtrler()
sck0.InitController()
scfg, _ := sck0.Query()
scfg := sck0.Query()
if !scfg.IsMember(ngid) {
t.Fatalf("Didn't recover gid %d", ngid)
}
sck0.ExitController()
// reconnect old controller, which shouldn't finish ChangeConfigTo
clnt.ConnectAll()
@ -795,7 +681,7 @@ func partitionRecovery5C(t *testing.T, reliable bool, npart, nclnt int) {
}
for i := 0; i < npart; i++ {
ts.killCtrler(ck, gid, ka, va)
ts.partitionCtrler(ck, gid, ka, va)
}
if nclnt > 0 {
@ -827,7 +713,7 @@ func TestPartitionRecoveryReliableClerks5C(t *testing.T) {
func TestPartitionRecoveryUnreliableClerks5C(t *testing.T) {
const (
NPARTITION = 5
NPARTITION = 3
)
partitionRecovery5C(t, false, NPARTITION, 5)
}

View File

@ -16,7 +16,6 @@ import (
"6.5840/labrpc"
"6.5840/shardkv1/shardcfg"
"6.5840/shardkv1/shardctrler"
"6.5840/shardkv1/shardctrler/param"
"6.5840/shardkv1/shardgrp"
"6.5840/tester1"
)
@ -25,9 +24,9 @@ type Test struct {
t *testing.T
*kvtest.Test
sck *shardctrler.ShardCtrler
part string
leases bool
sck *shardctrler.ShardCtrler
part string
partition bool
maxraftstate int
mu sync.Mutex
@ -41,11 +40,11 @@ const (
)
// Setup kvserver for the shard controller and make the controller
func MakeTestMaxRaft(t *testing.T, part string, reliable, leases bool, maxraftstate int) *Test {
func MakeTestMaxRaft(t *testing.T, part string, reliable, partition bool, maxraftstate int) *Test {
ts := &Test{
ngid: shardcfg.Gid1 + 1, // Gid1 is in use
t: t,
leases: leases,
partition: partition,
maxraftstate: maxraftstate,
}
cfg := tester.MakeConfig(t, 1, reliable, kvsrv.StartKVServer)
@ -86,7 +85,7 @@ func (ts *Test) makeShardCtrler() *shardctrler.ShardCtrler {
func (ts *Test) makeShardCtrlerClnt() (*shardctrler.ShardCtrler, *tester.Clnt) {
clnt := ts.Config.MakeClient()
return shardctrler.MakeShardCtrler(clnt, ts.leases), clnt
return shardctrler.MakeShardCtrler(clnt), clnt
}
func (ts *Test) makeKVClerk() *kvsrv.Clerk {
@ -128,14 +127,14 @@ func (ts *Test) StartServerShardGrp(servers []*labrpc.ClientEnd, gid tester.Tgid
}
func (ts *Test) checkMember(sck *shardctrler.ShardCtrler, gid tester.Tgid) bool {
cfg, _ := sck.Query()
cfg := sck.Query()
ok := cfg.IsMember(gid)
return ok
}
// Add group gid
func (ts *Test) join(sck *shardctrler.ShardCtrler, gid tester.Tgid, srvs []string) {
cfg, _ := sck.Query()
cfg := sck.Query()
newcfg := cfg.Copy()
ok := newcfg.JoinBalance(map[tester.Tgid][]string{gid: srvs})
if !ok {
@ -158,7 +157,7 @@ func (ts *Test) joinGroups(sck *shardctrler.ShardCtrler, gids []tester.Tgid) boo
// Group gid leaves.
func (ts *Test) leave(sck *shardctrler.ShardCtrler, gid tester.Tgid) {
cfg, _ := sck.Query()
cfg := sck.Query()
newcfg := cfg.Copy()
ok := newcfg.LeaveBalance([]tester.Tgid{gid})
if !ok {
@ -179,25 +178,12 @@ func (ts *Test) leaveGroups(sck *shardctrler.ShardCtrler, gids []tester.Tgid) bo
return true
}
func (ts *Test) disconnectRaftLeader(gid tester.Tgid) (int, string) {
_, l := rsm.Leader(ts.Config, gid)
g := ts.Group(gid)
ln := g.SrvName(l)
g.DisconnectAll(l)
return l, ln
}
func (ts *Test) reconnectOldLeader(gid tester.Tgid, l int) {
g := ts.Group(gid)
g.ConnectOne(l)
}
func (ts *Test) disconnectClntFromLeader(clnt *tester.Clnt, gid tester.Tgid) int {
l, ln := ts.disconnectRaftLeader(gid)
p := ts.Group(gid).AllowServersExcept(l)
srvs := ts.Group(gid).SrvNamesTo(p)
clnt.Disconnect(ln)
clnt.ConnectTo(srvs)
func (ts *Test) disconnectClntFromLeader(gid tester.Tgid) int {
ok, l := rsm.Leader(ts.Config, gid)
if !ok {
log.Fatalf("Leader failed")
}
ts.Group(gid).ShutdownServer(l)
return l
}
@ -252,15 +238,14 @@ func (ts *Test) checkShutdownSharding(down tester.Tgid, ka []string, va []string
// Run one controler and then partition it after some time. Run
// another cntrler that must finish the first ctrler's unfinished
// shard moves. To ensure first ctrler is in a join/leave the test
// shuts down shardgrp `gid`. After the second controller is done,
// heal the partition to test if Freeze,InstallShard, and Delete are
// are fenced.
func (ts *Test) killCtrler(ck kvtest.IKVClerk, gid tester.Tgid, ka, va []string) {
// shard moves. To make it likely that first ctrler is in a join/leave
// the test shuts down shardgrp `gid`. After the second controller is
// done, heal the partition. partitionCtrler returns if recovery
// happened.
func (ts *Test) partitionCtrler(ck kvtest.IKVClerk, gid tester.Tgid, ka, va []string) {
const (
NSLEEP = 2
RAND = 1000
RAND = 400
NSEC = 1
JOIN = 1
LEAVE = 2
@ -269,7 +254,7 @@ func (ts *Test) killCtrler(ck kvtest.IKVClerk, gid tester.Tgid, ka, va []string)
sck, clnt := ts.makeShardCtrlerClnt()
sck.InitController()
cfg, _ := ts.ShardCtrler().Query()
cfg := ts.ShardCtrler().Query()
num := cfg.Num
state := 0
@ -283,50 +268,57 @@ func (ts *Test) killCtrler(ck kvtest.IKVClerk, gid tester.Tgid, ka, va []string)
state = LEAVE
ts.leaveGroups(sck, []tester.Tgid{ngid})
} else {
//log.Printf("deposed")
//log.Printf("%v: deposed", sck.Id())
return
}
}
}()
// let sck run for a little while
time.Sleep(1000 * time.Millisecond)
r := rand.Int() % RAND
d := time.Duration(r) * time.Millisecond
time.Sleep(d)
//log.Printf("shutdown gid %d after %dms", gid, r)
//log.Printf("shutdown gid %d after %dms %v", gid, r, time.Now().Sub(t))
ts.Group(gid).Shutdown()
// sleep for a while to get the chance for the controler to get stuck
// in join or leave, because gid is down
time.Sleep(NSLEEP * time.Second)
// sleep for a while to get sck stuck in join or leave, because
// gid is down
time.Sleep(1000 * time.Millisecond)
//log.Printf("disconnect sck %v ngid %d num %d state %d", d, ngid, num, state)
// partition controller
clnt.DisconnectAll()
if ts.leases {
// wait until sck's lease expired before restarting shardgrp `gid`
time.Sleep((param.LEASETIMESEC + 1) * time.Second)
if ts.partition {
// wait a while before restarting shardgrp `gid`
time.Sleep(NSEC * time.Second)
}
//log.Printf("startservers %v lease expired %t", time.Now().Sub(t), ts.leases)
ts.Group(gid).StartServers()
// start new controler to pick up where sck left off
sck0, clnt0 := ts.makeShardCtrlerClnt()
sck0.InitController()
cfg, _ = sck0.Query()
cfg = sck0.Query()
s := "join"
if state == LEAVE {
s = "leave"
}
//log.Printf("%v cfg %v recovered %s", s, cfg, s)
if cfg.Num <= num {
ts.Fatalf("didn't recover; expected %d > %d", num, cfg.Num)
}
//log.Printf("%v: recovered %v %v %v", sck0.Id(), time.Now().Sub(t), s, cfg)
present := cfg.IsMember(ngid)
if (state == JOIN && !present) || (state == LEAVE && present) {
ts.Fatalf("didn't recover %d correctly after %v", ngid, s)
@ -337,32 +329,30 @@ func (ts *Test) killCtrler(ck kvtest.IKVClerk, gid tester.Tgid, ka, va []string)
ts.leaveGroups(sck0, []tester.Tgid{ngid})
}
for i := 0; i < len(ka); i++ {
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
}
sck0.ExitController()
if ts.leases {
//log.Printf("reconnect old controller")
if ts.partition {
// reconnect old controller, which should bail out, because
// it has been superseded.
clnt.ConnectAll()
time.Sleep(1 * time.Second)
time.Sleep(100 * time.Millisecond)
for i := 0; i < len(ka); i++ {
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
}
}
//log.Printf("reconnected %v", time.Now().Sub(t))
for i := 0; i < len(ka); i++ {
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
}
//log.Printf("done get %v", time.Now().Sub(t))
ts.Config.DeleteClient(clnt)
ts.Config.DeleteClient(clnt0)
}
func (ts *Test) electCtrler(ck kvtest.IKVClerk, ka, va []string) {
func (ts *Test) concurCtrler(ck kvtest.IKVClerk, ka, va []string) {
const (
NSEC = 5
NSEC = 2
N = 4
)
@ -376,16 +366,16 @@ func (ts *Test) electCtrler(ck kvtest.IKVClerk, ka, va []string) {
ngid := ts.newGid()
sck := ts.makeShardCtrler()
sck.InitController()
//log.Printf("%d(%p): join/leave %v", i, sck, ngid)
//log.Printf("%v: electCtrler %d join/leave %v", sck.Id(), i, ngid)
ts.joinGroups(sck, []tester.Tgid{ngid})
if ok := ts.checkMember(sck, ngid); ok {
//log.Printf("%v: electCtrler %d leave %d", sck.Id(), i, ngid)
if ok := ts.leaveGroups(sck, []tester.Tgid{ngid}); !ok {
log.Fatalf("electCtrler: %d(%p): leave %v failed", i, sck, ngid)
//log.Printf("%v: electCtrler %d leave %v failed", sck.Id(), i, ngid)
}
} else {
log.Fatalf("electCtrler: %d(%p): join %v failed", i, sck, ngid)
//log.Printf("%v: electCtrler %d join %v failed", sck.Id(), i, ngid)
}
sck.ExitController()
}
}
}
@ -399,6 +389,7 @@ func (ts *Test) electCtrler(ck kvtest.IKVClerk, ka, va []string) {
for i := 0; i < N; i++ {
ch <- struct{}{}
}
for i := 0; i < len(ka); i++ {
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
}

View File

@ -1,169 +0,0 @@
=== RUN TestAcquireLockConcurrentReliable5C
Test (5C): Concurent ctrlers acquiring leadership ... (reliable network)...
2025/03/20 09:18:33 PecUxIPV: acquire success 1 l 2.999731394s
2025/03/20 09:18:35 PecUxIPV: Release 3
2025/03/20 09:18:35 aKDBLFuF: acquire success 4 l 2.999504542s
2025/03/20 09:18:37 aKDBLFuF: Release 6
2025/03/20 09:18:37 HxhaFlAP: acquire success 7 l 2.999622621s
2025/03/20 09:18:39 HxhaFlAP: Release 9
2025/03/20 09:18:39 LpTmFCGC: acquire success 10 l 2.999747179s
2025/03/20 09:18:41 LpTmFCGC: Release 13
2025/03/20 09:18:41 klmldUQn: acquire success 14 l 2.999558604s
2025/03/20 09:18:43 klmldUQn: Release 17
2025/03/20 09:18:43 AWgiWKPZ: acquire success 18 l 2.999701903s
2025/03/20 09:18:46 AWgiWKPZ: Release 21
... Passed -- 16.4 1 2061 120
--- PASS: TestAcquireLockConcurrentReliable5C (16.38s)
=== RUN TestAcquireLockConcurrentUnreliable5C
Test (5C): Concurent ctrlers acquiring leadership ... (unreliable network)...
2025/03/20 09:19:00 xulPPlwd: acquire success 2 l 2.768860613s
2025/03/20 09:19:05 xulPPlwd: Release 6
2025/03/20 09:19:05 SGXgIJeR: acquire success 7 l 2.984694448s
2025/03/20 09:19:08 SGXgIJeR: Release 11
2025/03/20 09:19:08 kNvktGla: acquire success 12 l 2.986135242s
2025/03/20 09:19:13 kNvktGla: Release 17
2025/03/20 09:19:13 usGKuyeI: acquire success 18 l 2.97484218s
2025/03/20 09:19:19 usGKuyeI: Release 24
... Passed -- 38.4 1 2226 120
--- PASS: TestAcquireLockConcurrentUnreliable5C (38.37s)
=== RUN TestLeaseBasicRelease5C
Test (5C): release lease ... (reliable network)...
2025/03/20 09:19:25 fWllyjFs: acquire success 1 l 2.999778852s
2025/03/20 09:19:25 fWllyjFs: Release 2
2025/03/20 09:19:25 HqoctgYf: acquire success 3 l 2.999623311s
2025/03/20 09:19:26 HqoctgYf: Release 4
... Passed -- 0.4 1 17 0
--- PASS: TestLeaseBasicRelease5C (0.42s)
=== RUN TestLeaseBasicExpire5C
Test (5C): lease expiring ... (reliable network)...
2025/03/20 09:19:26 MgmIiwHw: acquire success 1 l 2.999622077s
2025/03/20 09:19:29 PviuBaqZ: acquire: MgmIiwHw lease expired -31.512117ms
2025/03/20 09:19:29 PviuBaqZ: acquire success 2 l 2.9996929s
2025/03/20 09:19:29 PviuBaqZ: Release 3
... Passed -- 3.1 1 81 0
--- PASS: TestLeaseBasicExpire5C (3.14s)
=== RUN TestLeaseBasicRefresh5C
Test (5C): lease refresh ... (reliable network)...
2025/03/20 09:19:29 CqhHcMdl: acquire success 1 l 2.999690343s
... Passed -- 7.1 1 144 0
--- PASS: TestLeaseBasicRefresh5C (7.10s)
=== RUN TestPartitionControllerJoin5C
Test (5C): partition controller in join... (reliable network)...
2025/03/20 09:19:38 CqhHcMdl: Release 9
2025/03/20 09:19:38 QykadXGi: acquire success 1 l 2.999763148s
2025/03/20 09:19:43 YWktoCTH: acquire: QykadXGi lease expired -2.003411436s
2025/03/20 09:19:43 YWktoCTH: acquire success 2 l 2.999580573s
2025/03/20 09:19:45 YWktoCTH: Release 4
2025/03/20 09:19:45 QykadXGi: refresher: exit expired -3.255782562s
... Passed -- 11.2 1 1011 120
--- PASS: TestPartitionControllerJoin5C (11.22s)
=== RUN TestPartitionRecoveryReliableNoClerk5C
Test (5C): controllers with leased leadership ... (reliable network)...
2025/03/20 09:19:50 nLqpPYYg: acquire success 1 l 2.999773699s
2025/03/20 09:19:56 Yauplngb: acquire: nLqpPYYg lease expired -1.030252686s
2025/03/20 09:19:56 Yauplngb: acquire success 4 l 2.999760357s
2025/03/20 09:19:58 Yauplngb: Release 7
2025/03/20 09:19:58 nLqpPYYg: refresher: exit expired -3.848348135s
2025/03/20 09:20:00 dsvADejV: acquire success 8 l 2.999675453s
2025/03/20 09:20:07 jdcPVdvf: acquire: dsvADejV lease expired -1.56610473s
2025/03/20 09:20:07 jdcPVdvf: acquire success 11 l 2.999839821s
2025/03/20 09:20:10 jdcPVdvf: Release 15
2025/03/20 09:20:10 dsvADejV: refresher: exit expired -4.604218577s
2025/03/20 09:20:12 vzVcVtTQ: acquire success 16 l 2.999743618s
2025/03/20 09:20:19 valCDRmB: acquire: vzVcVtTQ lease expired -1.988170854s
2025/03/20 09:20:19 valCDRmB: acquire success 19 l 2.999667662s
2025/03/20 09:20:22 valCDRmB: Release 22
2025/03/20 09:20:22 vzVcVtTQ: refresher: exit expired -4.943386258s
2025/03/20 09:20:23 RJYqYuLF: acquire success 23 l 2.999774783s
2025/03/20 09:20:30 KaeJpVvL: acquire: RJYqYuLF lease expired -1.222157296s
2025/03/20 09:20:30 KaeJpVvL: acquire success 26 l 2.999897268s
2025/03/20 09:20:33 KaeJpVvL: Release 30
2025/03/20 09:20:33 RJYqYuLF: refresher: exit expired -4.429889332s
2025/03/20 09:20:34 leVdobnP: acquire success 31 l 2.999770816s
2025/03/20 09:20:41 DFnmWean: acquire: leVdobnP lease expired -1.756292497s
2025/03/20 09:20:41 DFnmWean: acquire success 34 l 2.999905276s
2025/03/20 09:20:44 DFnmWean: Release 38
2025/03/20 09:20:44 leVdobnP: refresher: exit expired -4.84260629s
... Passed -- 59.3 1 5454 660
--- PASS: TestPartitionRecoveryReliableNoClerk5C (59.30s)
=== RUN TestPartitionRecoveryUnreliableNoClerk5C
Test (5C): controllers with leased leadership ... (unreliable network)...
2025/03/20 09:21:01 oBRWPJFn: acquire success 1 l 2.999668901s
2025/03/20 09:21:08 WCfEtCSF: acquire: oBRWPJFn lease expired -1.960469635s
2025/03/20 09:21:08 WCfEtCSF: acquire success 4 l 2.989064006s
2025/03/20 09:21:19 WCfEtCSF: Release 15
2025/03/20 09:21:20 oBRWPJFn: refresher: exit expired -13.623366094s
2025/03/20 09:21:25 BKOHUPgK: acquire success 16 l 2.974368151s
2025/03/20 09:21:32 dpZEDTAn: acquire: BKOHUPgK lease expired -1.266079689s
2025/03/20 09:21:32 dpZEDTAn: acquire success 19 l 2.986737971s
2025/03/20 09:21:42 dpZEDTAn: Release 29
2025/03/20 09:21:43 BKOHUPgK: refresher: exit expired -12.408069097s
2025/03/20 09:21:50 TiapOztE: acquire: dpZEDTAn lease expired -4.992859225s
2025/03/20 09:21:50 TiapOztE: acquire success 30 l 2.972701594s
2025/03/20 09:21:57 aDyCYcpR: acquire: TiapOztE lease expired -1.338848496s
2025/03/20 09:21:57 aDyCYcpR: acquire success 33 l 2.99686939s
2025/03/20 09:22:07 aDyCYcpR: Release 43
2025/03/20 09:22:07 TiapOztE: refresher: exit expired -12.147734461s
... Passed -- 86.9 1 4985 420
--- PASS: TestPartitionRecoveryUnreliableNoClerk5C (86.88s)
=== RUN TestPartitionRecoveryReliableClerks5C
Test (5C): controllers with leased leadership ... (reliable network)...
2025/03/20 09:22:13 vZrMwEsy: acquire success 1 l 2.999893567s
2025/03/20 09:22:20 AFHDpDYV: acquire: vZrMwEsy lease expired -1.657500925s
2025/03/20 09:22:20 AFHDpDYV: acquire success 4 l 2.999596975s
2025/03/20 09:22:22 AFHDpDYV: Release 6
2025/03/20 09:22:22 vZrMwEsy: refresher: exit expired -3.627083489s
2025/03/20 09:22:23 tserHLNb: acquire success 7 l 2.999932478s
2025/03/20 09:22:29 msIfUgIC: acquire: tserHLNb lease expired -1.13789373s
2025/03/20 09:22:29 msIfUgIC: acquire success 10 l 2.999755401s
2025/03/20 09:22:31 msIfUgIC: Release 12
2025/03/20 09:22:31 tserHLNb: refresher: exit expired -3.083945752s
2025/03/20 09:22:32 YLEIZyDn: acquire success 13 l 2.999940475s
2025/03/20 09:22:38 TIibzsMc: acquire: YLEIZyDn lease expired -1.017825561s
2025/03/20 09:22:38 TIibzsMc: acquire success 16 l 2.999907075s
2025/03/20 09:22:40 TIibzsMc: Release 18
2025/03/20 09:22:40 YLEIZyDn: refresher: exit expired -2.789136907s
2025/03/20 09:22:41 knOnYtxW: acquire success 19 l 2.999891429s
2025/03/20 09:22:47 KyiPMsgB: acquire: knOnYtxW lease expired -1.534324297s
2025/03/20 09:22:47 KyiPMsgB: acquire success 22 l 2.999822725s
2025/03/20 09:22:49 KyiPMsgB: Release 24
2025/03/20 09:22:49 knOnYtxW: refresher: exit expired -3.516354686s
2025/03/20 09:22:50 wHNCImkl: acquire success 25 l 2.999917928s
2025/03/20 09:22:56 CSBcxnyr: acquire: wHNCImkl lease expired -1.051161379s
2025/03/20 09:22:56 CSBcxnyr: acquire success 28 l 2.999745303s
2025/03/20 09:22:58 CSBcxnyr: Release 31
2025/03/20 09:22:58 wHNCImkl: refresher: exit expired -3.241024197s
... Passed -- 60.1 1 15934 5124
--- PASS: TestPartitionRecoveryReliableClerks5C (60.14s)
=== RUN TestPartitionRecoveryUnreliableClerks5C
Test (5C): controllers with leased leadership ... (unreliable network)...
2025/03/20 09:23:14 ydfNYYir: acquire success 1 l 2.871807366s
2025/03/20 09:23:21 KmfOaYym: acquire: ydfNYYir lease expired -1.96910688s
2025/03/20 09:23:21 KmfOaYym: acquire success 4 l 2.976357121s
2025/03/20 09:23:25 KmfOaYym: Release 9
2025/03/20 09:23:26 ydfNYYir: refresher: exit expired -6.960801287s
2025/03/20 09:23:27 XErxjiqb: acquire success 10 l 2.994288153s
2025/03/20 09:23:34 VQFBAKED: acquire: XErxjiqb lease expired -1.186993995s
2025/03/20 09:23:34 VQFBAKED: acquire success 14 l 2.978008397s
2025/03/20 09:23:40 VQFBAKED: Release 20
2025/03/20 09:23:40 XErxjiqb: refresher: exit expired -7.422563867s
2025/03/20 09:23:41 IqJHVjsW: acquire success 21 l 2.984528802s
2025/03/20 09:23:47 NAaIOMcb: acquire: IqJHVjsW lease expired -1.19246442s
2025/03/20 09:23:48 NAaIOMcb: acquire success 25 l 2.521727902s
2025/03/20 09:23:53 NAaIOMcb: Release 30
2025/03/20 09:23:53 IqJHVjsW: refresher: exit expired -7.130118022s
2025/03/20 09:23:54 pwTkolYO: acquire success 32 l 2.761741697s
2025/03/20 09:24:01 GAueeCFX: acquire: pwTkolYO lease expired -1.496813006s
2025/03/20 09:24:01 GAueeCFX: acquire success 34 l 2.977558093s
2025/03/20 09:24:06 GAueeCFX: Release 39
2025/03/20 09:24:06 pwTkolYO: refresher: exit expired -6.843949894s
2025/03/20 09:24:09 FIDtQSlF: acquire: GAueeCFX lease expired -15.508321ms
2025/03/20 09:24:09 FIDtQSlF: acquire success 40 l 2.998464382s
2025/03/20 09:24:16 wExaLSov: acquire: FIDtQSlF lease expired -1.874162487s
2025/03/20 09:24:16 wExaLSov: acquire success 43 l 2.860519358s
2025/03/20 09:24:21 wExaLSov: Release 48
2025/03/20 09:24:21 FIDtQSlF: refresher: exit expired -6.924846198s
... Passed -- 68.4 1 10469 1186
--- PASS: TestPartitionRecoveryUnreliableClerks5C (68.39s)
PASS
ok 6.5840/shardkv1 351.349s

View File

@ -138,7 +138,7 @@ func (cfg *Config) End() {
ops := atomic.LoadInt32(&cfg.ops) // number of clerk get/put/append calls
fmt.Printf(" ... Passed --")
fmt.Printf(" %4.1f %d %5d %4d\n", t, npeers, nrpc, ops)
fmt.Printf(" time %4.1fs #peers %d #RPCs %5d #Ops %4d\n", t, npeers, nrpc, ops)
}
}

View File

@ -299,19 +299,6 @@ func (sg *ServerGrp) MakePartition(l int) ([]int, []int) {
return p1, p2
}
func (sg *ServerGrp) AllowServersExcept(l int) []int {
n := len(sg.srvs) - 1
p := make([]int, n)
j := 0
for i, _ := range sg.srvs {
if i != l {
p[j] = i
j++
}
}
return p
}
func (sg *ServerGrp) Partition(p1 []int, p2 []int) {
//log.Printf("partition servers into: %v %v\n", p1, p2)
for i := 0; i < len(p1); i++ {