Compare commits
10 Commits
958a4d45b7
...
9d4d24dc3e
| Author | SHA1 | Date | |
|---|---|---|---|
| 9d4d24dc3e | |||
|
|
fa6877b02d | ||
|
|
d21b9ffe74 | ||
|
|
059b435561 | ||
|
|
c96bad59a5 | ||
|
|
8743e4fb99 | ||
|
|
bb597de980 | ||
|
|
8f21c11cfc | ||
|
|
05dc50ab27 | ||
|
|
7ba7e45595 |
@ -32,7 +32,7 @@ func main() {
|
||||
func loadPlugin(filename string) (func(string, string) []mr.KeyValue, func(string, []string) string) {
|
||||
p, err := plugin.Open(filename)
|
||||
if err != nil {
|
||||
log.Fatalf("cannot load plugin %v", filename)
|
||||
log.Fatalf("cannot load plugin %v: %s", filename, err)
|
||||
}
|
||||
xmapf, err := p.Lookup("Map")
|
||||
if err != nil {
|
||||
|
||||
@ -5,11 +5,42 @@ import "net"
|
||||
import "os"
|
||||
import "net/rpc"
|
||||
import "net/http"
|
||||
import "sync"
|
||||
|
||||
type status uint8
|
||||
|
||||
const (
|
||||
idle status = iota
|
||||
progress
|
||||
completed
|
||||
)
|
||||
|
||||
type Coordinator struct {
|
||||
// Your definitions here.
|
||||
// Your definitions here.
|
||||
mapJobStatus map[string]status
|
||||
reduceJobStatus map[string]status
|
||||
|
||||
workerStatus map[int]status
|
||||
|
||||
intermediateFiles map[string][]string
|
||||
|
||||
nReduce int
|
||||
|
||||
workerCount int
|
||||
|
||||
m sync.Mutex
|
||||
}
|
||||
|
||||
func (c *Coordinator) areMapJobsDone() bool {
|
||||
done := true
|
||||
|
||||
for _, job := range c.mapJobStatus {
|
||||
if job != completed {
|
||||
done = false
|
||||
}
|
||||
}
|
||||
|
||||
return done
|
||||
}
|
||||
|
||||
// Your code here -- RPC handlers for the worker to call.
|
||||
@ -20,25 +51,73 @@ type Coordinator struct {
|
||||
// the RPC argument and reply types are defined in rpc.go.
|
||||
//
|
||||
func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {
|
||||
reply.Y = args.X + 1
|
||||
return nil
|
||||
reply.Y = args.X + 1
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Coordinator) Register(args *RegisterArgs, reply *RegisterReply) error {
|
||||
c.m.Lock()
|
||||
defer c.m.Unlock()
|
||||
|
||||
reply.WorkerId = c.workerCount
|
||||
c.workerCount += 1
|
||||
|
||||
c.workerStatus[reply.WorkerId] = idle
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Coordinator) GetJob(args *GetJobArgs, reply *GetJobReply) error {
|
||||
c.m.Lock()
|
||||
c.m.Unlock()
|
||||
|
||||
if !c.areMapJobsDone() {
|
||||
for filename, jobStatus := range c.mapJobStatus {
|
||||
if jobStatus == idle {
|
||||
var job MapJob
|
||||
job.InputFile = filename
|
||||
job.ReducerCount = c.nReduce
|
||||
c.mapJobStatus[filename] = progress
|
||||
|
||||
reply.Finished = false
|
||||
reply.JobType = Map
|
||||
reply.MapJob = job
|
||||
|
||||
c.workerStatus[args.WorkerId] = progress
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Coordinator) Finish(args *MapResult, reply *FinishReply) error {
|
||||
c.m.Lock()
|
||||
defer c.m.Unlock()
|
||||
|
||||
c.intermediateFiles[args.InputFile] = args.IntermediateFiles
|
||||
c.mapJobStatus[args.InputFile] = completed
|
||||
c.workerStatus[args.WorkerId] = idle
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
//
|
||||
// start a thread that listens for RPCs from worker.go
|
||||
//
|
||||
func (c *Coordinator) server() {
|
||||
rpc.Register(c)
|
||||
rpc.HandleHTTP()
|
||||
//l, e := net.Listen("tcp", ":1234")
|
||||
sockname := coordinatorSock()
|
||||
os.Remove(sockname)
|
||||
l, e := net.Listen("unix", sockname)
|
||||
if e != nil {
|
||||
log.Fatal("listen error:", e)
|
||||
}
|
||||
go http.Serve(l, nil)
|
||||
rpc.Register(c)
|
||||
rpc.HandleHTTP()
|
||||
//l, e := net.Listen("tcp", ":1234")
|
||||
sockname := coordinatorSock()
|
||||
os.Remove(sockname)
|
||||
l, e := net.Listen("unix", sockname)
|
||||
if e != nil {
|
||||
log.Fatal("listen error:", e)
|
||||
}
|
||||
go http.Serve(l, nil)
|
||||
}
|
||||
|
||||
//
|
||||
@ -46,12 +125,12 @@ func (c *Coordinator) server() {
|
||||
// if the entire job has finished.
|
||||
//
|
||||
func (c *Coordinator) Done() bool {
|
||||
ret := false
|
||||
ret := false
|
||||
|
||||
// Your code here.
|
||||
// Your code here.
|
||||
|
||||
|
||||
return ret
|
||||
return ret
|
||||
}
|
||||
|
||||
//
|
||||
@ -60,11 +139,19 @@ func (c *Coordinator) Done() bool {
|
||||
// nReduce is the number of reduce tasks to use.
|
||||
//
|
||||
func MakeCoordinator(files []string, nReduce int) *Coordinator {
|
||||
c := Coordinator{}
|
||||
c := Coordinator{}
|
||||
|
||||
// Your code here.
|
||||
// Your code here.
|
||||
c.mapJobStatus = make(map[string]status)
|
||||
c.intermediateFiles = make(map[string][]string)
|
||||
c.workerStatus = make(map[int]status)
|
||||
c.nReduce = nReduce
|
||||
|
||||
for _, v := range files {
|
||||
c.mapJobStatus[v] = idle
|
||||
c.intermediateFiles[v] = make([]string, nReduce)
|
||||
}
|
||||
|
||||
c.server()
|
||||
return &c
|
||||
c.server()
|
||||
return &c
|
||||
}
|
||||
|
||||
@ -15,22 +15,70 @@ import "strconv"
|
||||
//
|
||||
|
||||
type ExampleArgs struct {
|
||||
X int
|
||||
X int
|
||||
}
|
||||
|
||||
type ExampleReply struct {
|
||||
Y int
|
||||
Y int
|
||||
}
|
||||
|
||||
// Add your RPC definitions here.
|
||||
|
||||
type JobType uint8
|
||||
const (
|
||||
Map JobType = iota
|
||||
Reduce
|
||||
)
|
||||
|
||||
type MapJob struct {
|
||||
//Index int
|
||||
InputFile string
|
||||
ReducerCount int
|
||||
//IntermediateFiles []string
|
||||
}
|
||||
|
||||
type ReduceJob struct {
|
||||
ReducerNumber int
|
||||
IntermediateFiles []string
|
||||
}
|
||||
|
||||
type MapResult struct {
|
||||
InputFile string
|
||||
IntermediateFiles []string
|
||||
WorkerId int
|
||||
}
|
||||
|
||||
type RegisterArgs struct {
|
||||
// empty
|
||||
}
|
||||
|
||||
type RegisterReply struct {
|
||||
WorkerId int
|
||||
}
|
||||
|
||||
type GetJobArgs struct {
|
||||
WorkerId int
|
||||
}
|
||||
|
||||
type GetJobReply struct {
|
||||
JobType JobType
|
||||
|
||||
MapJob MapJob
|
||||
ReduceJob ReduceJob
|
||||
|
||||
Finished bool
|
||||
}
|
||||
|
||||
type FinishReply struct {
|
||||
|
||||
}
|
||||
|
||||
// Cook up a unique-ish UNIX-domain socket name
|
||||
// in /var/tmp, for the coordinator.
|
||||
// Can't use the current directory since
|
||||
// Athena AFS doesn't support UNIX-domain sockets.
|
||||
func coordinatorSock() string {
|
||||
s := "/var/tmp/5840-mr-"
|
||||
s += strconv.Itoa(os.Getuid())
|
||||
return s
|
||||
s := "/var/tmp/5840-mr-"
|
||||
s += strconv.Itoa(os.Getuid())
|
||||
return s
|
||||
}
|
||||
|
||||
179
src/mr/worker.go
179
src/mr/worker.go
@ -4,14 +4,16 @@ import "fmt"
|
||||
import "log"
|
||||
import "net/rpc"
|
||||
import "hash/fnv"
|
||||
|
||||
import "os"
|
||||
import "io"
|
||||
import "encoding/json"
|
||||
|
||||
//
|
||||
// Map functions return a slice of KeyValue.
|
||||
//
|
||||
type KeyValue struct {
|
||||
Key string
|
||||
Value string
|
||||
Key string
|
||||
Value string
|
||||
}
|
||||
|
||||
//
|
||||
@ -19,23 +21,91 @@ type KeyValue struct {
|
||||
// task number for each KeyValue emitted by Map.
|
||||
//
|
||||
func ihash(key string) int {
|
||||
h := fnv.New32a()
|
||||
h.Write([]byte(key))
|
||||
return int(h.Sum32() & 0x7fffffff)
|
||||
h := fnv.New32a()
|
||||
h.Write([]byte(key))
|
||||
return int(h.Sum32() & 0x7fffffff)
|
||||
}
|
||||
|
||||
var id int
|
||||
|
||||
//
|
||||
// main/mrworker.go calls this function.
|
||||
//
|
||||
func Worker(mapf func(string, string) []KeyValue,
|
||||
reducef func(string, []string) string) {
|
||||
func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) {
|
||||
|
||||
// Your worker implementation here.
|
||||
// Your worker implementation here.
|
||||
|
||||
// uncomment to send the Example RPC to the coordinator.
|
||||
// CallExample()
|
||||
registerReply := Register()
|
||||
id = registerReply.WorkerId
|
||||
|
||||
done := false
|
||||
|
||||
for !done {
|
||||
reply := GetJob(GetJobArgs{id})
|
||||
|
||||
if reply.Finished {
|
||||
done = true
|
||||
continue
|
||||
}
|
||||
|
||||
if reply.JobType == Map {
|
||||
doMap(mapf, reply.MapJob)
|
||||
} else if reply.JobType == Reduce {
|
||||
doReduce(reducef, reply.ReduceJob)
|
||||
}
|
||||
}
|
||||
|
||||
// uncomment to send the Example RPC to the coordinator.
|
||||
// CallExample()
|
||||
|
||||
}
|
||||
|
||||
func doMap(mapf func(string, string) []KeyValue, job MapJob) {
|
||||
file, err := os.Open(job.InputFile)
|
||||
if err != nil {
|
||||
log.Fatalf("[Error] Could not open file %s", job.InputFile)
|
||||
}
|
||||
|
||||
content, err := io.ReadAll(file)
|
||||
if err != nil {
|
||||
log.Fatalf("[Error] Could not read file %s", job.InputFile)
|
||||
}
|
||||
file.Close()
|
||||
|
||||
kva := mapf(job.InputFile, string(content))
|
||||
|
||||
sort.Sort(ByKey(kva))
|
||||
|
||||
partitions := make([][]KeyValue, job.ReducerCount)
|
||||
|
||||
for _, v := range kva {
|
||||
i := ihash(v.Key) % job.ReducerCount
|
||||
partitions[i] = append(partitions[i], v)
|
||||
}
|
||||
|
||||
intermediateFiles := make([]string, job.ReducerCount)
|
||||
|
||||
for i, v := range partitions {
|
||||
filename := fmt.Sprintf("mr-%v-%v.txt", job.Index, i)
|
||||
file, err := os.Open(filename);
|
||||
if err != nil {
|
||||
log.Fatalf("[Error] Could not open file %s", filename)
|
||||
}
|
||||
|
||||
enc := json.NewEncoder(file)
|
||||
for _, kv := range v {
|
||||
err := enc.Encode(&kv)
|
||||
if err != nil {
|
||||
log.Fatalf("[Error] Could not write to file %s", filename)
|
||||
}
|
||||
}
|
||||
|
||||
intermediateFiles[i] = filename
|
||||
|
||||
file.Close()
|
||||
}
|
||||
|
||||
Finish(MapResult{job.InputFile, intermediateFiles, id})
|
||||
}
|
||||
|
||||
//
|
||||
@ -45,26 +115,53 @@ func Worker(mapf func(string, string) []KeyValue,
|
||||
//
|
||||
func CallExample() {
|
||||
|
||||
// declare an argument structure.
|
||||
args := ExampleArgs{}
|
||||
// declare an argument structure.
|
||||
args := ExampleArgs{}
|
||||
|
||||
// fill in the argument(s).
|
||||
args.X = 99
|
||||
// fill in the argument(s).
|
||||
args.X = 99
|
||||
|
||||
// declare a reply structure.
|
||||
reply := ExampleReply{}
|
||||
// declare a reply structure.
|
||||
reply := ExampleReply{}
|
||||
|
||||
// send the RPC request, wait for the reply.
|
||||
// the "Coordinator.Example" tells the
|
||||
// receiving server that we'd like to call
|
||||
// the Example() method of struct Coordinator.
|
||||
ok := call("Coordinator.Example", &args, &reply)
|
||||
if ok {
|
||||
// reply.Y should be 100.
|
||||
fmt.Printf("reply.Y %v\n", reply.Y)
|
||||
} else {
|
||||
fmt.Printf("call failed!\n")
|
||||
}
|
||||
// send the RPC request, wait for the reply.
|
||||
// the "Coordinator.Example" tells the
|
||||
// receiving server that we'd like to call
|
||||
// the Example() method of struct Coordinator.
|
||||
ok := call("Coordinator.Example", &args, &reply)
|
||||
if ok {
|
||||
// reply.Y should be 100.
|
||||
fmt.Printf("reply.Y %v\n", reply.Y)
|
||||
} else {
|
||||
fmt.Printf("call failed!\n")
|
||||
}
|
||||
|
||||
call("Coordinatior.Wrong", &args, &reply);
|
||||
}
|
||||
|
||||
func Register() RegisterReply {
|
||||
var args RegisterArgs
|
||||
|
||||
var reply RegisterReply
|
||||
|
||||
call("Coordinator.Register", &args, &reply)
|
||||
return reply
|
||||
}
|
||||
|
||||
func GetJob(args GetJobArgs) GetJobReply {
|
||||
var reply GetJobReply
|
||||
|
||||
call("Coordinator.GetJob", &args, &reply)
|
||||
|
||||
return reply
|
||||
}
|
||||
|
||||
func Finish(args MapResult) FinishReply {
|
||||
var reply FinishReply
|
||||
|
||||
call("Coordinator.Finish", &args, &reply)
|
||||
|
||||
return reply
|
||||
}
|
||||
|
||||
//
|
||||
@ -73,19 +170,19 @@ func CallExample() {
|
||||
// returns false if something goes wrong.
|
||||
//
|
||||
func call(rpcname string, args interface{}, reply interface{}) bool {
|
||||
// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
|
||||
sockname := coordinatorSock()
|
||||
c, err := rpc.DialHTTP("unix", sockname)
|
||||
if err != nil {
|
||||
log.Fatal("dialing:", err)
|
||||
}
|
||||
defer c.Close()
|
||||
// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
|
||||
sockname := coordinatorSock()
|
||||
c, err := rpc.DialHTTP("unix", sockname)
|
||||
if err != nil {
|
||||
log.Fatal("dialing:", err)
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
err = c.Call(rpcname, args, reply)
|
||||
if err == nil {
|
||||
return true
|
||||
}
|
||||
err = c.Call(rpcname, args, reply)
|
||||
if err == nil {
|
||||
return true
|
||||
}
|
||||
|
||||
fmt.Println(err)
|
||||
return false
|
||||
fmt.Println(err)
|
||||
return false
|
||||
}
|
||||
|
||||
@ -1,31 +0,0 @@
|
||||
package lock
|
||||
|
||||
import (
|
||||
|
||||
"6.5840/kvsrv1"
|
||||
"6.5840/kvsrv1/rpc"
|
||||
"6.5840/shardkv1/shardctrler/param"
|
||||
)
|
||||
|
||||
|
||||
type Lock struct {
|
||||
ck *kvsrv.Clerk
|
||||
|
||||
}
|
||||
|
||||
// Use l as the key to store the "lock state" (you would have to decide
|
||||
// precisely what the lock state is).
|
||||
func MakeLock(ck kvtest.IKVClerk, l string) *Lock {
|
||||
lk := &Lock{ck: ck.(*kvsrv.Clerk)}
|
||||
// You may add code here
|
||||
return lk
|
||||
}
|
||||
|
||||
|
||||
func (lk *Lock) Acquire() {
|
||||
// You may add code here.
|
||||
}
|
||||
|
||||
func (lk *Lock) Release() {
|
||||
// You may add code here.
|
||||
}
|
||||
@ -1,89 +0,0 @@
|
||||
package lock
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
// "log"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"6.5840/kvsrv1"
|
||||
"6.5840/kvsrv1/rpc"
|
||||
"6.5840/kvtest1"
|
||||
)
|
||||
|
||||
const (
|
||||
NACQUIRE = 10
|
||||
NCLNT = 10
|
||||
NSEC = 2
|
||||
)
|
||||
|
||||
func oneClient(t *testing.T, me int, ck kvtest.IKVClerk, done chan struct{}) kvtest.ClntRes {
|
||||
lk := MakeLock(ck, "l")
|
||||
ck.Put("l0", "", 0)
|
||||
for i := 1; true; i++ {
|
||||
select {
|
||||
case <-done:
|
||||
return kvtest.ClntRes{i, 0}
|
||||
default:
|
||||
lk.Acquire()
|
||||
|
||||
// log.Printf("%d: acquired lock", me)
|
||||
|
||||
b := strconv.Itoa(me)
|
||||
val, ver, err := ck.Get("l0")
|
||||
if err == rpc.OK {
|
||||
if val != "" {
|
||||
t.Fatalf("%d: two clients acquired lock %v", me, val)
|
||||
}
|
||||
} else {
|
||||
t.Fatalf("%d: get failed %v", me, err)
|
||||
}
|
||||
|
||||
err = ck.Put("l0", string(b), ver)
|
||||
if !(err == rpc.OK || err == rpc.ErrMaybe) {
|
||||
t.Fatalf("%d: put failed %v", me, err)
|
||||
}
|
||||
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
err = ck.Put("l0", "", ver+1)
|
||||
if !(err == rpc.OK || err == rpc.ErrMaybe) {
|
||||
t.Fatalf("%d: put failed %v", me, err)
|
||||
}
|
||||
|
||||
// log.Printf("%d: release lock", me)
|
||||
|
||||
lk.Release()
|
||||
}
|
||||
}
|
||||
return kvtest.ClntRes{}
|
||||
}
|
||||
|
||||
// Run test clients
|
||||
func runClients(t *testing.T, nclnt int, reliable bool) {
|
||||
ts := kvsrv.MakeTestKV(t, reliable)
|
||||
defer ts.Cleanup()
|
||||
|
||||
ts.Begin(fmt.Sprintf("Test: %d lock clients", nclnt))
|
||||
|
||||
ts.SpawnClientsAndWait(nclnt, NSEC*time.Second, func(me int, myck kvtest.IKVClerk, done chan struct{}) kvtest.ClntRes {
|
||||
return oneClient(t, me, myck, done)
|
||||
})
|
||||
}
|
||||
|
||||
func TestOneClientReliable(t *testing.T) {
|
||||
runClients(t, 1, true)
|
||||
}
|
||||
|
||||
func TestManyClientsReliable(t *testing.T) {
|
||||
runClients(t, NCLNT, true)
|
||||
}
|
||||
|
||||
func TestOneClientUnreliable(t *testing.T) {
|
||||
runClients(t, 1, false)
|
||||
}
|
||||
|
||||
func TestManyClientsUnreliable(t *testing.T) {
|
||||
runClients(t, NCLNT, false)
|
||||
}
|
||||
@ -1,7 +0,0 @@
|
||||
package param
|
||||
|
||||
const (
|
||||
// Length of time that a lease is valid. If a client doesn't
|
||||
// refresh the lease within this time, the lease will expire.
|
||||
LEASETIMESEC = 3
|
||||
)
|
||||
@ -6,10 +6,7 @@ package shardctrler
|
||||
|
||||
import (
|
||||
|
||||
"sync/atomic"
|
||||
|
||||
"6.5840/kvsrv1"
|
||||
"6.5840/kvsrv1/rpc"
|
||||
"6.5840/kvtest1"
|
||||
"6.5840/shardkv1/shardcfg"
|
||||
"6.5840/tester1"
|
||||
@ -22,14 +19,13 @@ type ShardCtrler struct {
|
||||
kvtest.IKVClerk
|
||||
|
||||
killed int32 // set by Kill()
|
||||
leases bool
|
||||
|
||||
// Your data here.
|
||||
}
|
||||
|
||||
// Make a ShardCltler, which stores its state in a kvsrv.
|
||||
func MakeShardCtrler(clnt *tester.Clnt, leases bool) *ShardCtrler {
|
||||
sck := &ShardCtrler{clnt: clnt, leases: leases}
|
||||
func MakeShardCtrler(clnt *tester.Clnt) *ShardCtrler {
|
||||
sck := &ShardCtrler{clnt: clnt}
|
||||
srv := tester.ServerName(tester.GRP0, 0)
|
||||
sck.IKVClerk = kvsrv.MakeClerk(clnt, srv)
|
||||
// Your code here.
|
||||
@ -38,20 +34,15 @@ func MakeShardCtrler(clnt *tester.Clnt, leases bool) *ShardCtrler {
|
||||
|
||||
// The tester calls InitController() before starting a new
|
||||
// controller. In part A, this method doesn't need to do anything. In
|
||||
// B and C, this method implements recovery (part B) and uses a lock
|
||||
// to become leader (part C).
|
||||
// B and C, this method implements recovery.
|
||||
func (sck *ShardCtrler) InitController() {
|
||||
}
|
||||
|
||||
// The tester calls ExitController to exit a controller. In part B and
|
||||
// C, release lock.
|
||||
func (sck *ShardCtrler) ExitController() {
|
||||
}
|
||||
|
||||
// Called once by the tester to supply the first configuration. You
|
||||
// can marshal ShardConfig into a string using shardcfg.String(), and
|
||||
// then Put it in the kvsrv for the controller at version 0. You can
|
||||
// pick the key to name the configuration.
|
||||
// pick the key to name the configuration. The initial configuration
|
||||
// lists shardgrp shardcfg.Gid1 for all shards.
|
||||
func (sck *ShardCtrler) InitConfig(cfg *shardcfg.ShardConfig) {
|
||||
// Your code here
|
||||
}
|
||||
@ -61,25 +52,13 @@ func (sck *ShardCtrler) InitConfig(cfg *shardcfg.ShardConfig) {
|
||||
// changes the configuration it may be superseded by another
|
||||
// controller.
|
||||
func (sck *ShardCtrler) ChangeConfigTo(new *shardcfg.ShardConfig) {
|
||||
return
|
||||
}
|
||||
|
||||
// Tester "kills" shardctrler by calling Kill(). For your
|
||||
// convenience, we also supply isKilled() method to test killed in
|
||||
// loops.
|
||||
func (sck *ShardCtrler) Kill() {
|
||||
atomic.StoreInt32(&sck.killed, 1)
|
||||
}
|
||||
|
||||
func (sck *ShardCtrler) isKilled() bool {
|
||||
z := atomic.LoadInt32(&sck.killed)
|
||||
return z == 1
|
||||
}
|
||||
|
||||
|
||||
// Return the current configuration and its version number
|
||||
func (sck *ShardCtrler) Query() (*shardcfg.ShardConfig, rpc.Tversion) {
|
||||
// Your code here.
|
||||
return nil, 0
|
||||
}
|
||||
|
||||
|
||||
// Return the current configuration
|
||||
func (sck *ShardCtrler) Query() *shardcfg.ShardConfig {
|
||||
// Your code here.
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -2,7 +2,6 @@ package shardgrp
|
||||
|
||||
import (
|
||||
|
||||
|
||||
"6.5840/kvsrv1/rpc"
|
||||
"6.5840/shardkv1/shardcfg"
|
||||
"6.5840/tester1"
|
||||
@ -11,7 +10,7 @@ import (
|
||||
type Clerk struct {
|
||||
clnt *tester.Clnt
|
||||
servers []string
|
||||
leader int // last successful leader (index into servers[])
|
||||
// You will have to modify this struct.
|
||||
}
|
||||
|
||||
func MakeClerk(clnt *tester.Clnt, servers []string) *Clerk {
|
||||
@ -19,24 +18,27 @@ func MakeClerk(clnt *tester.Clnt, servers []string) *Clerk {
|
||||
return ck
|
||||
}
|
||||
|
||||
func (ck *Clerk) Get(key string, n shardcfg.Tnum) (string, rpc.Tversion, rpc.Err) {
|
||||
func (ck *Clerk) Get(key string) (string, rpc.Tversion, rpc.Err) {
|
||||
// Your code here
|
||||
return "", 0, ""
|
||||
}
|
||||
|
||||
func (ck *Clerk) Put(key string, value string, version rpc.Tversion, n shardcfg.Tnum) (bool, rpc.Err) {
|
||||
func (ck *Clerk) Put(key string, value string, version rpc.Tversion) rpc.Err {
|
||||
// Your code here
|
||||
return false, ""
|
||||
return ""
|
||||
}
|
||||
|
||||
func (ck *Clerk) Freeze(s shardcfg.Tshid, num shardcfg.Tnum) ([]byte, rpc.Err) {
|
||||
func (ck *Clerk) FreezeShard(s shardcfg.Tshid, num shardcfg.Tnum) ([]byte, rpc.Err) {
|
||||
// Your code here
|
||||
return nil, ""
|
||||
}
|
||||
|
||||
func (ck *Clerk) InstallShard(s shardcfg.Tshid, state []byte, num shardcfg.Tnum) rpc.Err {
|
||||
// Your code here
|
||||
return ""
|
||||
}
|
||||
|
||||
func (ck *Clerk) Delete(s shardcfg.Tshid, num shardcfg.Tnum) rpc.Err {
|
||||
func (ck *Clerk) DeleteShard(s shardcfg.Tshid, num shardcfg.Tnum) rpc.Err {
|
||||
// Your code here
|
||||
return ""
|
||||
}
|
||||
|
||||
@ -14,11 +14,12 @@ import (
|
||||
|
||||
|
||||
type KVServer struct {
|
||||
gid tester.Tgid
|
||||
me int
|
||||
dead int32 // set by Kill()
|
||||
rsm *rsm.RSM
|
||||
gid tester.Tgid
|
||||
|
||||
// Your code here
|
||||
}
|
||||
|
||||
|
||||
@ -37,17 +38,17 @@ func (kv *KVServer) Restore(data []byte) {
|
||||
// Your code here
|
||||
}
|
||||
|
||||
func (kv *KVServer) Get(args *shardrpc.GetArgs, reply *rpc.GetReply) {
|
||||
func (kv *KVServer) Get(args *rpc.GetArgs, reply *rpc.GetReply) {
|
||||
// Your code here
|
||||
}
|
||||
|
||||
func (kv *KVServer) Put(args *shardrpc.PutArgs, reply *rpc.PutReply) {
|
||||
func (kv *KVServer) Put(args *rpc.PutArgs, reply *rpc.PutReply) {
|
||||
// Your code here
|
||||
}
|
||||
|
||||
// Freeze the specified shard (i.e., reject future Get/Puts for this
|
||||
// shard) and return the key/values stored in that shard.
|
||||
func (kv *KVServer) Freeze(args *shardrpc.FreezeArgs, reply *shardrpc.FreezeReply) {
|
||||
func (kv *KVServer) FreezeShard(args *shardrpc.FreezeShardArgs, reply *shardrpc.FreezeShardReply) {
|
||||
// Your code here
|
||||
}
|
||||
|
||||
@ -57,7 +58,7 @@ func (kv *KVServer) InstallShard(args *shardrpc.InstallShardArgs, reply *shardrp
|
||||
}
|
||||
|
||||
// Delete the specified shard.
|
||||
func (kv *KVServer) Delete(args *shardrpc.DeleteShardArgs, reply *shardrpc.DeleteShardReply) {
|
||||
func (kv *KVServer) DeleteShard(args *shardrpc.DeleteShardArgs, reply *shardrpc.DeleteShardReply) {
|
||||
// Your code here
|
||||
}
|
||||
|
||||
@ -86,9 +87,9 @@ func (kv *KVServer) killed() bool {
|
||||
func StartServerShardGrp(servers []*labrpc.ClientEnd, gid tester.Tgid, me int, persister *tester.Persister, maxraftstate int) []tester.IService {
|
||||
// call labgob.Register on structures you want
|
||||
// Go's RPC library to marshall/unmarshall.
|
||||
labgob.Register(shardrpc.PutArgs{})
|
||||
labgob.Register(shardrpc.GetArgs{})
|
||||
labgob.Register(shardrpc.FreezeArgs{})
|
||||
labgob.Register(rpc.PutArgs{})
|
||||
labgob.Register(rpc.GetArgs{})
|
||||
labgob.Register(shardrpc.FreezeShardArgs{})
|
||||
labgob.Register(shardrpc.InstallShardArgs{})
|
||||
labgob.Register(shardrpc.DeleteShardArgs{})
|
||||
labgob.Register(rsm.Op{})
|
||||
@ -97,5 +98,6 @@ func StartServerShardGrp(servers []*labrpc.ClientEnd, gid tester.Tgid, me int, p
|
||||
kv.rsm = rsm.MakeRSM(servers, me, persister, maxraftstate, kv)
|
||||
|
||||
// Your code here
|
||||
|
||||
return []tester.IService{kv, kv.rsm.Raft()}
|
||||
}
|
||||
|
||||
@ -5,26 +5,12 @@ import (
|
||||
"6.5840/shardkv1/shardcfg"
|
||||
)
|
||||
|
||||
// Same as Put in kvsrv1/rpc, but with a configuration number
|
||||
type PutArgs struct {
|
||||
Key string
|
||||
Value string
|
||||
Version rpc.Tversion
|
||||
Num shardcfg.Tnum
|
||||
}
|
||||
|
||||
// Same as Get in kvsrv1/rpc, but with a configuration number.
|
||||
type GetArgs struct {
|
||||
Key string
|
||||
Num shardcfg.Tnum
|
||||
}
|
||||
|
||||
type FreezeArgs struct {
|
||||
type FreezeShardArgs struct {
|
||||
Shard shardcfg.Tshid
|
||||
Num shardcfg.Tnum
|
||||
}
|
||||
|
||||
type FreezeReply struct {
|
||||
type FreezeShardReply struct {
|
||||
State []byte
|
||||
Num shardcfg.Tnum
|
||||
Err rpc.Err
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
package shardkv
|
||||
|
||||
import (
|
||||
"log"
|
||||
//"log"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -9,7 +9,6 @@ import (
|
||||
"6.5840/kvtest1"
|
||||
"6.5840/shardkv1/shardcfg"
|
||||
"6.5840/shardkv1/shardctrler"
|
||||
"6.5840/shardkv1/shardctrler/param"
|
||||
"6.5840/tester1"
|
||||
)
|
||||
|
||||
@ -28,7 +27,7 @@ func TestInitQuery5A(t *testing.T) {
|
||||
defer ts.Cleanup()
|
||||
|
||||
// Make a shard controller
|
||||
sck := shardctrler.MakeShardCtrler(ts.Config.MakeClient(), ts.leases)
|
||||
sck := shardctrler.MakeShardCtrler(ts.Config.MakeClient())
|
||||
|
||||
// Make an empty shard configuration
|
||||
scfg := shardcfg.MakeShardConfig()
|
||||
@ -41,9 +40,9 @@ func TestInitQuery5A(t *testing.T) {
|
||||
sck.InitConfig(scfg)
|
||||
|
||||
// Read the initial configuration and check it
|
||||
cfg, v := sck.Query()
|
||||
if v != 1 || cfg.Num != 1 || cfg.Shards[0] != shardcfg.Gid1 {
|
||||
ts.t.Fatalf("Static wrong %v %v", cfg, v)
|
||||
cfg := sck.Query()
|
||||
if cfg.Num != 1 || cfg.Shards[0] != shardcfg.Gid1 {
|
||||
ts.t.Fatalf("Static wrong %v", cfg)
|
||||
}
|
||||
cfg.CheckConfig(t, []tester.Tgid{shardcfg.Gid1})
|
||||
}
|
||||
@ -68,7 +67,7 @@ func TestStaticOneShardGroup5A(t *testing.T) {
|
||||
|
||||
// disconnect raft leader of shardgrp and check that keys are
|
||||
// still available
|
||||
ts.disconnectClntFromLeader(ck.(*kvtest.TestClerk).Clnt, shardcfg.Gid1)
|
||||
ts.disconnectClntFromLeader(shardcfg.Gid1)
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1)) // check the puts
|
||||
@ -87,14 +86,14 @@ func TestJoinBasic5A(t *testing.T) {
|
||||
ka, va := ts.SpreadPuts(ck, NKEYS)
|
||||
|
||||
sck := ts.ShardCtrler()
|
||||
cfg, _ := sck.Query()
|
||||
cfg := sck.Query()
|
||||
|
||||
gid2 := ts.newGid()
|
||||
if ok := ts.joinGroups(sck, []tester.Tgid{gid2}); !ok {
|
||||
ts.t.Fatalf("TestJoinBasic5A: joinGroups failed")
|
||||
}
|
||||
|
||||
cfg1, _ := sck.Query()
|
||||
cfg1 := sck.Query()
|
||||
if cfg.Num+1 != cfg1.Num {
|
||||
ts.t.Fatalf("TestJoinBasic5A: wrong num %d expected %d ", cfg1.Num, cfg.Num+1)
|
||||
}
|
||||
@ -270,7 +269,7 @@ func TestShutdown5A(t *testing.T) {
|
||||
|
||||
// Test that Gets for keys at groups that are alive
|
||||
// return
|
||||
func TestProgressShutdown(t *testing.T) {
|
||||
func TestProgressShutdown5A(t *testing.T) {
|
||||
const (
|
||||
NJOIN = 4
|
||||
NSEC = 2
|
||||
@ -299,7 +298,7 @@ func TestProgressShutdown(t *testing.T) {
|
||||
alive[g] = true
|
||||
}
|
||||
|
||||
cfg, _ := sck.Query()
|
||||
cfg := sck.Query()
|
||||
|
||||
ch := make(chan rpc.Err)
|
||||
go func() {
|
||||
@ -322,7 +321,7 @@ func TestProgressShutdown(t *testing.T) {
|
||||
}
|
||||
|
||||
// Test that Gets from a non-moving shard return quickly
|
||||
func TestProgressJoin(t *testing.T) {
|
||||
func TestProgressJoin5A(t *testing.T) {
|
||||
const (
|
||||
NJOIN = 4
|
||||
NSEC = 4
|
||||
@ -341,7 +340,7 @@ func TestProgressJoin(t *testing.T) {
|
||||
grps := ts.groups(NJOIN)
|
||||
ts.joinGroups(sck, grps)
|
||||
|
||||
cfg, _ := sck.Query()
|
||||
cfg := sck.Query()
|
||||
newcfg := cfg.Copy()
|
||||
newgid := tester.Tgid(NJOIN + 3)
|
||||
if ok := newcfg.JoinBalance(map[tester.Tgid][]string{newgid: []string{"xxx"}}); !ok {
|
||||
@ -409,7 +408,7 @@ func TestProgressJoin(t *testing.T) {
|
||||
|
||||
select {
|
||||
case cnt := <-ch1:
|
||||
log.Printf("cnt %d", cnt)
|
||||
//log.Printf("cnt %d", cnt)
|
||||
if cnt < NCNT {
|
||||
ts.Fatalf("Two few gets finished %d; expected more than %d", cnt, NCNT)
|
||||
}
|
||||
@ -492,7 +491,7 @@ func TestJoinLeave5B(t *testing.T) {
|
||||
ka, va := ts.SpreadPuts(ck, NKEYS)
|
||||
|
||||
sck := ts.ShardCtrler()
|
||||
cfg, _ := sck.Query()
|
||||
cfg := sck.Query()
|
||||
|
||||
ts.Group(gid1).Shutdown()
|
||||
|
||||
@ -521,7 +520,7 @@ func TestJoinLeave5B(t *testing.T) {
|
||||
ts.Fatalf("Join didn't complete")
|
||||
}
|
||||
|
||||
cfg1, _ := sck.Query()
|
||||
cfg1 := sck.Query()
|
||||
if cfg.Num+1 != cfg1.Num {
|
||||
ts.t.Fatalf("wrong num %d expected %d ", cfg1.Num, cfg.Num+1)
|
||||
}
|
||||
@ -569,140 +568,28 @@ func TestRecoverCtrler5B(t *testing.T) {
|
||||
ka, va := ts.SpreadPuts(ck, NKEYS)
|
||||
|
||||
for i := 0; i < NPARTITION; i++ {
|
||||
ts.killCtrler(ck, gid, ka, va)
|
||||
ts.partitionCtrler(ck, gid, ka, va)
|
||||
}
|
||||
}
|
||||
|
||||
// Test concurrent ctrlers fighting for leadership reliable
|
||||
func TestAcquireLockConcurrentReliable5C(t *testing.T) {
|
||||
ts := MakeTestLeases(t, "Test (5C): Concurent ctrlers acquiring leadership ...", true)
|
||||
func TestConcurrentReliable5C(t *testing.T) {
|
||||
ts := MakeTestLeases(t, "Test (5C): Concurent ctrlers ...", true)
|
||||
defer ts.Cleanup()
|
||||
ts.setupKVService()
|
||||
ck := ts.MakeClerk()
|
||||
ka, va := ts.SpreadPuts(ck, NKEYS)
|
||||
ts.electCtrler(ck, ka, va)
|
||||
ts.concurCtrler(ck, ka, va)
|
||||
}
|
||||
|
||||
// Test concurrent ctrlers fighting for leadership unreliable
|
||||
func TestAcquireLockConcurrentUnreliable5C(t *testing.T) {
|
||||
ts := MakeTestLeases(t, "Test (5C): Concurent ctrlers acquiring leadership ...", false)
|
||||
ts := MakeTestLeases(t, "Test (5C): Concurent ctrlers ...", false)
|
||||
defer ts.Cleanup()
|
||||
ts.setupKVService()
|
||||
ck := ts.MakeClerk()
|
||||
ka, va := ts.SpreadPuts(ck, NKEYS)
|
||||
ts.electCtrler(ck, ka, va)
|
||||
}
|
||||
|
||||
// Test that ReleaseLock allows a new leader to start quickly
|
||||
func TestLeaseBasicRelease5C(t *testing.T) {
|
||||
ts := MakeTestLeases(t, "Test (5C): release lease ...", true)
|
||||
defer ts.Cleanup()
|
||||
ts.setupKVService()
|
||||
|
||||
sck0, clnt0 := ts.makeShardCtrlerClnt()
|
||||
go func() {
|
||||
sck0.InitController()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
sck0.ExitController()
|
||||
}()
|
||||
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
// start new controller
|
||||
sck1, clnt1 := ts.makeShardCtrlerClnt()
|
||||
ch := make(chan struct{})
|
||||
go func() {
|
||||
sck1.InitController()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
sck1.ExitController()
|
||||
ch <- struct{}{}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-time.After(1 * time.Second):
|
||||
ts.Fatalf("Release didn't give up leadership")
|
||||
case <-ch:
|
||||
}
|
||||
|
||||
ts.Config.DeleteClient(clnt0)
|
||||
ts.Config.DeleteClient(clnt1)
|
||||
}
|
||||
|
||||
// Test lease expiring
|
||||
func TestLeaseBasicExpire5C(t *testing.T) {
|
||||
ts := MakeTestLeases(t, "Test (5C): lease expiring ...", true)
|
||||
defer ts.Cleanup()
|
||||
ts.setupKVService()
|
||||
|
||||
sck0, clnt0 := ts.makeShardCtrlerClnt()
|
||||
go func() {
|
||||
sck0.InitController()
|
||||
for {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
}()
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// partition sck0 forever
|
||||
clnt0.DisconnectAll()
|
||||
|
||||
// start new controller
|
||||
sck1, clnt1 := ts.makeShardCtrlerClnt()
|
||||
ch := make(chan struct{})
|
||||
go func() {
|
||||
sck1.InitController()
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
sck1.ExitController()
|
||||
ch <- struct{}{}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-time.After((param.LEASETIMESEC + 1) * time.Second):
|
||||
ts.Fatalf("Lease didn't expire")
|
||||
case <-ch:
|
||||
}
|
||||
|
||||
ts.Config.DeleteClient(clnt0)
|
||||
ts.Config.DeleteClient(clnt1)
|
||||
}
|
||||
|
||||
// Test lease is being extended
|
||||
func TestLeaseBasicRefresh5C(t *testing.T) {
|
||||
const LEADERSEC = 3
|
||||
|
||||
ts := MakeTestLeases(t, "Test (5C): lease refresh ...", true)
|
||||
defer ts.Cleanup()
|
||||
ts.setupKVService()
|
||||
|
||||
sck0, clnt0 := ts.makeShardCtrlerClnt()
|
||||
go func() {
|
||||
sck0.InitController()
|
||||
time.Sleep(LEADERSEC * param.LEASETIMESEC * time.Second)
|
||||
sck0.ExitController()
|
||||
}()
|
||||
|
||||
// give sck0 time to become leader
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// start new controller
|
||||
sck1, clnt1 := ts.makeShardCtrlerClnt()
|
||||
ch := make(chan struct{})
|
||||
go func() {
|
||||
sck1.InitController()
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
sck1.ExitController()
|
||||
ch <- struct{}{}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-time.After((LEADERSEC + param.LEASETIMESEC + 1) * time.Second):
|
||||
case <-ch:
|
||||
ts.Fatalf("Lease not refreshed")
|
||||
}
|
||||
|
||||
ts.Config.DeleteClient(clnt0)
|
||||
ts.Config.DeleteClient(clnt1)
|
||||
ts.concurCtrler(ck, ka, va)
|
||||
}
|
||||
|
||||
// Test if old leader is fenced off when reconnecting while it is in
|
||||
@ -710,6 +597,7 @@ func TestLeaseBasicRefresh5C(t *testing.T) {
|
||||
func TestPartitionControllerJoin5C(t *testing.T) {
|
||||
const (
|
||||
NSLEEP = 2
|
||||
NSEC = 1
|
||||
RAND = 1000
|
||||
)
|
||||
|
||||
@ -739,8 +627,8 @@ func TestPartitionControllerJoin5C(t *testing.T) {
|
||||
// partition sck
|
||||
clnt.DisconnectAll()
|
||||
|
||||
// wait until sck's lease expired before restarting shardgrp `ngid`
|
||||
time.Sleep((param.LEASETIMESEC + 1) * time.Second)
|
||||
// wait a while before restarting shardgrp `ngid`
|
||||
time.Sleep(NSEC * time.Second)
|
||||
|
||||
ts.Group(ngid).StartServers()
|
||||
|
||||
@ -748,13 +636,11 @@ func TestPartitionControllerJoin5C(t *testing.T) {
|
||||
sck0 := ts.makeShardCtrler()
|
||||
sck0.InitController()
|
||||
|
||||
scfg, _ := sck0.Query()
|
||||
scfg := sck0.Query()
|
||||
if !scfg.IsMember(ngid) {
|
||||
t.Fatalf("Didn't recover gid %d", ngid)
|
||||
}
|
||||
|
||||
sck0.ExitController()
|
||||
|
||||
// reconnect old controller, which shouldn't finish ChangeConfigTo
|
||||
clnt.ConnectAll()
|
||||
|
||||
@ -795,7 +681,7 @@ func partitionRecovery5C(t *testing.T, reliable bool, npart, nclnt int) {
|
||||
}
|
||||
|
||||
for i := 0; i < npart; i++ {
|
||||
ts.killCtrler(ck, gid, ka, va)
|
||||
ts.partitionCtrler(ck, gid, ka, va)
|
||||
}
|
||||
|
||||
if nclnt > 0 {
|
||||
@ -827,7 +713,7 @@ func TestPartitionRecoveryReliableClerks5C(t *testing.T) {
|
||||
|
||||
func TestPartitionRecoveryUnreliableClerks5C(t *testing.T) {
|
||||
const (
|
||||
NPARTITION = 5
|
||||
NPARTITION = 3
|
||||
)
|
||||
partitionRecovery5C(t, false, NPARTITION, 5)
|
||||
}
|
||||
|
||||
@ -16,7 +16,6 @@ import (
|
||||
"6.5840/labrpc"
|
||||
"6.5840/shardkv1/shardcfg"
|
||||
"6.5840/shardkv1/shardctrler"
|
||||
"6.5840/shardkv1/shardctrler/param"
|
||||
"6.5840/shardkv1/shardgrp"
|
||||
"6.5840/tester1"
|
||||
)
|
||||
@ -25,9 +24,9 @@ type Test struct {
|
||||
t *testing.T
|
||||
*kvtest.Test
|
||||
|
||||
sck *shardctrler.ShardCtrler
|
||||
part string
|
||||
leases bool
|
||||
sck *shardctrler.ShardCtrler
|
||||
part string
|
||||
partition bool
|
||||
|
||||
maxraftstate int
|
||||
mu sync.Mutex
|
||||
@ -41,11 +40,11 @@ const (
|
||||
)
|
||||
|
||||
// Setup kvserver for the shard controller and make the controller
|
||||
func MakeTestMaxRaft(t *testing.T, part string, reliable, leases bool, maxraftstate int) *Test {
|
||||
func MakeTestMaxRaft(t *testing.T, part string, reliable, partition bool, maxraftstate int) *Test {
|
||||
ts := &Test{
|
||||
ngid: shardcfg.Gid1 + 1, // Gid1 is in use
|
||||
t: t,
|
||||
leases: leases,
|
||||
partition: partition,
|
||||
maxraftstate: maxraftstate,
|
||||
}
|
||||
cfg := tester.MakeConfig(t, 1, reliable, kvsrv.StartKVServer)
|
||||
@ -86,7 +85,7 @@ func (ts *Test) makeShardCtrler() *shardctrler.ShardCtrler {
|
||||
|
||||
func (ts *Test) makeShardCtrlerClnt() (*shardctrler.ShardCtrler, *tester.Clnt) {
|
||||
clnt := ts.Config.MakeClient()
|
||||
return shardctrler.MakeShardCtrler(clnt, ts.leases), clnt
|
||||
return shardctrler.MakeShardCtrler(clnt), clnt
|
||||
}
|
||||
|
||||
func (ts *Test) makeKVClerk() *kvsrv.Clerk {
|
||||
@ -128,14 +127,14 @@ func (ts *Test) StartServerShardGrp(servers []*labrpc.ClientEnd, gid tester.Tgid
|
||||
}
|
||||
|
||||
func (ts *Test) checkMember(sck *shardctrler.ShardCtrler, gid tester.Tgid) bool {
|
||||
cfg, _ := sck.Query()
|
||||
cfg := sck.Query()
|
||||
ok := cfg.IsMember(gid)
|
||||
return ok
|
||||
}
|
||||
|
||||
// Add group gid
|
||||
func (ts *Test) join(sck *shardctrler.ShardCtrler, gid tester.Tgid, srvs []string) {
|
||||
cfg, _ := sck.Query()
|
||||
cfg := sck.Query()
|
||||
newcfg := cfg.Copy()
|
||||
ok := newcfg.JoinBalance(map[tester.Tgid][]string{gid: srvs})
|
||||
if !ok {
|
||||
@ -158,7 +157,7 @@ func (ts *Test) joinGroups(sck *shardctrler.ShardCtrler, gids []tester.Tgid) boo
|
||||
|
||||
// Group gid leaves.
|
||||
func (ts *Test) leave(sck *shardctrler.ShardCtrler, gid tester.Tgid) {
|
||||
cfg, _ := sck.Query()
|
||||
cfg := sck.Query()
|
||||
newcfg := cfg.Copy()
|
||||
ok := newcfg.LeaveBalance([]tester.Tgid{gid})
|
||||
if !ok {
|
||||
@ -179,25 +178,12 @@ func (ts *Test) leaveGroups(sck *shardctrler.ShardCtrler, gids []tester.Tgid) bo
|
||||
return true
|
||||
}
|
||||
|
||||
func (ts *Test) disconnectRaftLeader(gid tester.Tgid) (int, string) {
|
||||
_, l := rsm.Leader(ts.Config, gid)
|
||||
g := ts.Group(gid)
|
||||
ln := g.SrvName(l)
|
||||
g.DisconnectAll(l)
|
||||
return l, ln
|
||||
}
|
||||
|
||||
func (ts *Test) reconnectOldLeader(gid tester.Tgid, l int) {
|
||||
g := ts.Group(gid)
|
||||
g.ConnectOne(l)
|
||||
}
|
||||
|
||||
func (ts *Test) disconnectClntFromLeader(clnt *tester.Clnt, gid tester.Tgid) int {
|
||||
l, ln := ts.disconnectRaftLeader(gid)
|
||||
p := ts.Group(gid).AllowServersExcept(l)
|
||||
srvs := ts.Group(gid).SrvNamesTo(p)
|
||||
clnt.Disconnect(ln)
|
||||
clnt.ConnectTo(srvs)
|
||||
func (ts *Test) disconnectClntFromLeader(gid tester.Tgid) int {
|
||||
ok, l := rsm.Leader(ts.Config, gid)
|
||||
if !ok {
|
||||
log.Fatalf("Leader failed")
|
||||
}
|
||||
ts.Group(gid).ShutdownServer(l)
|
||||
return l
|
||||
}
|
||||
|
||||
@ -252,15 +238,14 @@ func (ts *Test) checkShutdownSharding(down tester.Tgid, ka []string, va []string
|
||||
|
||||
// Run one controler and then partition it after some time. Run
|
||||
// another cntrler that must finish the first ctrler's unfinished
|
||||
// shard moves. To ensure first ctrler is in a join/leave the test
|
||||
// shuts down shardgrp `gid`. After the second controller is done,
|
||||
// heal the partition to test if Freeze,InstallShard, and Delete are
|
||||
// are fenced.
|
||||
func (ts *Test) killCtrler(ck kvtest.IKVClerk, gid tester.Tgid, ka, va []string) {
|
||||
// shard moves. To make it likely that first ctrler is in a join/leave
|
||||
// the test shuts down shardgrp `gid`. After the second controller is
|
||||
// done, heal the partition. partitionCtrler returns if recovery
|
||||
// happened.
|
||||
func (ts *Test) partitionCtrler(ck kvtest.IKVClerk, gid tester.Tgid, ka, va []string) {
|
||||
const (
|
||||
NSLEEP = 2
|
||||
|
||||
RAND = 1000
|
||||
RAND = 400
|
||||
NSEC = 1
|
||||
|
||||
JOIN = 1
|
||||
LEAVE = 2
|
||||
@ -269,7 +254,7 @@ func (ts *Test) killCtrler(ck kvtest.IKVClerk, gid tester.Tgid, ka, va []string)
|
||||
sck, clnt := ts.makeShardCtrlerClnt()
|
||||
sck.InitController()
|
||||
|
||||
cfg, _ := ts.ShardCtrler().Query()
|
||||
cfg := ts.ShardCtrler().Query()
|
||||
num := cfg.Num
|
||||
|
||||
state := 0
|
||||
@ -283,50 +268,57 @@ func (ts *Test) killCtrler(ck kvtest.IKVClerk, gid tester.Tgid, ka, va []string)
|
||||
state = LEAVE
|
||||
ts.leaveGroups(sck, []tester.Tgid{ngid})
|
||||
} else {
|
||||
//log.Printf("deposed")
|
||||
//log.Printf("%v: deposed", sck.Id())
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// let sck run for a little while
|
||||
time.Sleep(1000 * time.Millisecond)
|
||||
|
||||
r := rand.Int() % RAND
|
||||
d := time.Duration(r) * time.Millisecond
|
||||
time.Sleep(d)
|
||||
|
||||
//log.Printf("shutdown gid %d after %dms", gid, r)
|
||||
//log.Printf("shutdown gid %d after %dms %v", gid, r, time.Now().Sub(t))
|
||||
|
||||
ts.Group(gid).Shutdown()
|
||||
|
||||
// sleep for a while to get the chance for the controler to get stuck
|
||||
// in join or leave, because gid is down
|
||||
time.Sleep(NSLEEP * time.Second)
|
||||
// sleep for a while to get sck stuck in join or leave, because
|
||||
// gid is down
|
||||
time.Sleep(1000 * time.Millisecond)
|
||||
|
||||
//log.Printf("disconnect sck %v ngid %d num %d state %d", d, ngid, num, state)
|
||||
|
||||
// partition controller
|
||||
clnt.DisconnectAll()
|
||||
|
||||
if ts.leases {
|
||||
// wait until sck's lease expired before restarting shardgrp `gid`
|
||||
time.Sleep((param.LEASETIMESEC + 1) * time.Second)
|
||||
if ts.partition {
|
||||
// wait a while before restarting shardgrp `gid`
|
||||
time.Sleep(NSEC * time.Second)
|
||||
}
|
||||
|
||||
//log.Printf("startservers %v lease expired %t", time.Now().Sub(t), ts.leases)
|
||||
|
||||
ts.Group(gid).StartServers()
|
||||
|
||||
// start new controler to pick up where sck left off
|
||||
sck0, clnt0 := ts.makeShardCtrlerClnt()
|
||||
|
||||
sck0.InitController()
|
||||
cfg, _ = sck0.Query()
|
||||
cfg = sck0.Query()
|
||||
s := "join"
|
||||
if state == LEAVE {
|
||||
s = "leave"
|
||||
}
|
||||
//log.Printf("%v cfg %v recovered %s", s, cfg, s)
|
||||
|
||||
if cfg.Num <= num {
|
||||
ts.Fatalf("didn't recover; expected %d > %d", num, cfg.Num)
|
||||
}
|
||||
|
||||
//log.Printf("%v: recovered %v %v %v", sck0.Id(), time.Now().Sub(t), s, cfg)
|
||||
|
||||
present := cfg.IsMember(ngid)
|
||||
if (state == JOIN && !present) || (state == LEAVE && present) {
|
||||
ts.Fatalf("didn't recover %d correctly after %v", ngid, s)
|
||||
@ -337,32 +329,30 @@ func (ts *Test) killCtrler(ck kvtest.IKVClerk, gid tester.Tgid, ka, va []string)
|
||||
ts.leaveGroups(sck0, []tester.Tgid{ngid})
|
||||
}
|
||||
|
||||
for i := 0; i < len(ka); i++ {
|
||||
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
|
||||
}
|
||||
|
||||
sck0.ExitController()
|
||||
|
||||
if ts.leases {
|
||||
//log.Printf("reconnect old controller")
|
||||
|
||||
if ts.partition {
|
||||
// reconnect old controller, which should bail out, because
|
||||
// it has been superseded.
|
||||
clnt.ConnectAll()
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
for i := 0; i < len(ka); i++ {
|
||||
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
|
||||
}
|
||||
}
|
||||
|
||||
//log.Printf("reconnected %v", time.Now().Sub(t))
|
||||
|
||||
for i := 0; i < len(ka); i++ {
|
||||
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
|
||||
}
|
||||
|
||||
//log.Printf("done get %v", time.Now().Sub(t))
|
||||
|
||||
ts.Config.DeleteClient(clnt)
|
||||
ts.Config.DeleteClient(clnt0)
|
||||
}
|
||||
|
||||
func (ts *Test) electCtrler(ck kvtest.IKVClerk, ka, va []string) {
|
||||
func (ts *Test) concurCtrler(ck kvtest.IKVClerk, ka, va []string) {
|
||||
const (
|
||||
NSEC = 5
|
||||
NSEC = 2
|
||||
N = 4
|
||||
)
|
||||
|
||||
@ -376,16 +366,16 @@ func (ts *Test) electCtrler(ck kvtest.IKVClerk, ka, va []string) {
|
||||
ngid := ts.newGid()
|
||||
sck := ts.makeShardCtrler()
|
||||
sck.InitController()
|
||||
//log.Printf("%d(%p): join/leave %v", i, sck, ngid)
|
||||
//log.Printf("%v: electCtrler %d join/leave %v", sck.Id(), i, ngid)
|
||||
ts.joinGroups(sck, []tester.Tgid{ngid})
|
||||
if ok := ts.checkMember(sck, ngid); ok {
|
||||
//log.Printf("%v: electCtrler %d leave %d", sck.Id(), i, ngid)
|
||||
if ok := ts.leaveGroups(sck, []tester.Tgid{ngid}); !ok {
|
||||
log.Fatalf("electCtrler: %d(%p): leave %v failed", i, sck, ngid)
|
||||
//log.Printf("%v: electCtrler %d leave %v failed", sck.Id(), i, ngid)
|
||||
}
|
||||
} else {
|
||||
log.Fatalf("electCtrler: %d(%p): join %v failed", i, sck, ngid)
|
||||
//log.Printf("%v: electCtrler %d join %v failed", sck.Id(), i, ngid)
|
||||
}
|
||||
sck.ExitController()
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -399,6 +389,7 @@ func (ts *Test) electCtrler(ck kvtest.IKVClerk, ka, va []string) {
|
||||
for i := 0; i < N; i++ {
|
||||
ch <- struct{}{}
|
||||
}
|
||||
|
||||
for i := 0; i < len(ka); i++ {
|
||||
ts.CheckGet(ck, ka[i], va[i], rpc.Tversion(1))
|
||||
}
|
||||
|
||||
@ -1,169 +0,0 @@
|
||||
=== RUN TestAcquireLockConcurrentReliable5C
|
||||
Test (5C): Concurent ctrlers acquiring leadership ... (reliable network)...
|
||||
2025/03/20 09:18:33 PecUxIPV: acquire success 1 l 2.999731394s
|
||||
2025/03/20 09:18:35 PecUxIPV: Release 3
|
||||
2025/03/20 09:18:35 aKDBLFuF: acquire success 4 l 2.999504542s
|
||||
2025/03/20 09:18:37 aKDBLFuF: Release 6
|
||||
2025/03/20 09:18:37 HxhaFlAP: acquire success 7 l 2.999622621s
|
||||
2025/03/20 09:18:39 HxhaFlAP: Release 9
|
||||
2025/03/20 09:18:39 LpTmFCGC: acquire success 10 l 2.999747179s
|
||||
2025/03/20 09:18:41 LpTmFCGC: Release 13
|
||||
2025/03/20 09:18:41 klmldUQn: acquire success 14 l 2.999558604s
|
||||
2025/03/20 09:18:43 klmldUQn: Release 17
|
||||
2025/03/20 09:18:43 AWgiWKPZ: acquire success 18 l 2.999701903s
|
||||
2025/03/20 09:18:46 AWgiWKPZ: Release 21
|
||||
... Passed -- 16.4 1 2061 120
|
||||
--- PASS: TestAcquireLockConcurrentReliable5C (16.38s)
|
||||
=== RUN TestAcquireLockConcurrentUnreliable5C
|
||||
Test (5C): Concurent ctrlers acquiring leadership ... (unreliable network)...
|
||||
2025/03/20 09:19:00 xulPPlwd: acquire success 2 l 2.768860613s
|
||||
2025/03/20 09:19:05 xulPPlwd: Release 6
|
||||
2025/03/20 09:19:05 SGXgIJeR: acquire success 7 l 2.984694448s
|
||||
2025/03/20 09:19:08 SGXgIJeR: Release 11
|
||||
2025/03/20 09:19:08 kNvktGla: acquire success 12 l 2.986135242s
|
||||
2025/03/20 09:19:13 kNvktGla: Release 17
|
||||
2025/03/20 09:19:13 usGKuyeI: acquire success 18 l 2.97484218s
|
||||
2025/03/20 09:19:19 usGKuyeI: Release 24
|
||||
... Passed -- 38.4 1 2226 120
|
||||
--- PASS: TestAcquireLockConcurrentUnreliable5C (38.37s)
|
||||
=== RUN TestLeaseBasicRelease5C
|
||||
Test (5C): release lease ... (reliable network)...
|
||||
2025/03/20 09:19:25 fWllyjFs: acquire success 1 l 2.999778852s
|
||||
2025/03/20 09:19:25 fWllyjFs: Release 2
|
||||
2025/03/20 09:19:25 HqoctgYf: acquire success 3 l 2.999623311s
|
||||
2025/03/20 09:19:26 HqoctgYf: Release 4
|
||||
... Passed -- 0.4 1 17 0
|
||||
--- PASS: TestLeaseBasicRelease5C (0.42s)
|
||||
=== RUN TestLeaseBasicExpire5C
|
||||
Test (5C): lease expiring ... (reliable network)...
|
||||
2025/03/20 09:19:26 MgmIiwHw: acquire success 1 l 2.999622077s
|
||||
2025/03/20 09:19:29 PviuBaqZ: acquire: MgmIiwHw lease expired -31.512117ms
|
||||
2025/03/20 09:19:29 PviuBaqZ: acquire success 2 l 2.9996929s
|
||||
2025/03/20 09:19:29 PviuBaqZ: Release 3
|
||||
... Passed -- 3.1 1 81 0
|
||||
--- PASS: TestLeaseBasicExpire5C (3.14s)
|
||||
=== RUN TestLeaseBasicRefresh5C
|
||||
Test (5C): lease refresh ... (reliable network)...
|
||||
2025/03/20 09:19:29 CqhHcMdl: acquire success 1 l 2.999690343s
|
||||
... Passed -- 7.1 1 144 0
|
||||
--- PASS: TestLeaseBasicRefresh5C (7.10s)
|
||||
=== RUN TestPartitionControllerJoin5C
|
||||
Test (5C): partition controller in join... (reliable network)...
|
||||
2025/03/20 09:19:38 CqhHcMdl: Release 9
|
||||
2025/03/20 09:19:38 QykadXGi: acquire success 1 l 2.999763148s
|
||||
2025/03/20 09:19:43 YWktoCTH: acquire: QykadXGi lease expired -2.003411436s
|
||||
2025/03/20 09:19:43 YWktoCTH: acquire success 2 l 2.999580573s
|
||||
2025/03/20 09:19:45 YWktoCTH: Release 4
|
||||
2025/03/20 09:19:45 QykadXGi: refresher: exit expired -3.255782562s
|
||||
... Passed -- 11.2 1 1011 120
|
||||
--- PASS: TestPartitionControllerJoin5C (11.22s)
|
||||
=== RUN TestPartitionRecoveryReliableNoClerk5C
|
||||
Test (5C): controllers with leased leadership ... (reliable network)...
|
||||
2025/03/20 09:19:50 nLqpPYYg: acquire success 1 l 2.999773699s
|
||||
2025/03/20 09:19:56 Yauplngb: acquire: nLqpPYYg lease expired -1.030252686s
|
||||
2025/03/20 09:19:56 Yauplngb: acquire success 4 l 2.999760357s
|
||||
2025/03/20 09:19:58 Yauplngb: Release 7
|
||||
2025/03/20 09:19:58 nLqpPYYg: refresher: exit expired -3.848348135s
|
||||
2025/03/20 09:20:00 dsvADejV: acquire success 8 l 2.999675453s
|
||||
2025/03/20 09:20:07 jdcPVdvf: acquire: dsvADejV lease expired -1.56610473s
|
||||
2025/03/20 09:20:07 jdcPVdvf: acquire success 11 l 2.999839821s
|
||||
2025/03/20 09:20:10 jdcPVdvf: Release 15
|
||||
2025/03/20 09:20:10 dsvADejV: refresher: exit expired -4.604218577s
|
||||
2025/03/20 09:20:12 vzVcVtTQ: acquire success 16 l 2.999743618s
|
||||
2025/03/20 09:20:19 valCDRmB: acquire: vzVcVtTQ lease expired -1.988170854s
|
||||
2025/03/20 09:20:19 valCDRmB: acquire success 19 l 2.999667662s
|
||||
2025/03/20 09:20:22 valCDRmB: Release 22
|
||||
2025/03/20 09:20:22 vzVcVtTQ: refresher: exit expired -4.943386258s
|
||||
2025/03/20 09:20:23 RJYqYuLF: acquire success 23 l 2.999774783s
|
||||
2025/03/20 09:20:30 KaeJpVvL: acquire: RJYqYuLF lease expired -1.222157296s
|
||||
2025/03/20 09:20:30 KaeJpVvL: acquire success 26 l 2.999897268s
|
||||
2025/03/20 09:20:33 KaeJpVvL: Release 30
|
||||
2025/03/20 09:20:33 RJYqYuLF: refresher: exit expired -4.429889332s
|
||||
2025/03/20 09:20:34 leVdobnP: acquire success 31 l 2.999770816s
|
||||
2025/03/20 09:20:41 DFnmWean: acquire: leVdobnP lease expired -1.756292497s
|
||||
2025/03/20 09:20:41 DFnmWean: acquire success 34 l 2.999905276s
|
||||
2025/03/20 09:20:44 DFnmWean: Release 38
|
||||
2025/03/20 09:20:44 leVdobnP: refresher: exit expired -4.84260629s
|
||||
... Passed -- 59.3 1 5454 660
|
||||
--- PASS: TestPartitionRecoveryReliableNoClerk5C (59.30s)
|
||||
=== RUN TestPartitionRecoveryUnreliableNoClerk5C
|
||||
Test (5C): controllers with leased leadership ... (unreliable network)...
|
||||
2025/03/20 09:21:01 oBRWPJFn: acquire success 1 l 2.999668901s
|
||||
2025/03/20 09:21:08 WCfEtCSF: acquire: oBRWPJFn lease expired -1.960469635s
|
||||
2025/03/20 09:21:08 WCfEtCSF: acquire success 4 l 2.989064006s
|
||||
2025/03/20 09:21:19 WCfEtCSF: Release 15
|
||||
2025/03/20 09:21:20 oBRWPJFn: refresher: exit expired -13.623366094s
|
||||
2025/03/20 09:21:25 BKOHUPgK: acquire success 16 l 2.974368151s
|
||||
2025/03/20 09:21:32 dpZEDTAn: acquire: BKOHUPgK lease expired -1.266079689s
|
||||
2025/03/20 09:21:32 dpZEDTAn: acquire success 19 l 2.986737971s
|
||||
2025/03/20 09:21:42 dpZEDTAn: Release 29
|
||||
2025/03/20 09:21:43 BKOHUPgK: refresher: exit expired -12.408069097s
|
||||
2025/03/20 09:21:50 TiapOztE: acquire: dpZEDTAn lease expired -4.992859225s
|
||||
2025/03/20 09:21:50 TiapOztE: acquire success 30 l 2.972701594s
|
||||
2025/03/20 09:21:57 aDyCYcpR: acquire: TiapOztE lease expired -1.338848496s
|
||||
2025/03/20 09:21:57 aDyCYcpR: acquire success 33 l 2.99686939s
|
||||
2025/03/20 09:22:07 aDyCYcpR: Release 43
|
||||
2025/03/20 09:22:07 TiapOztE: refresher: exit expired -12.147734461s
|
||||
... Passed -- 86.9 1 4985 420
|
||||
--- PASS: TestPartitionRecoveryUnreliableNoClerk5C (86.88s)
|
||||
=== RUN TestPartitionRecoveryReliableClerks5C
|
||||
Test (5C): controllers with leased leadership ... (reliable network)...
|
||||
2025/03/20 09:22:13 vZrMwEsy: acquire success 1 l 2.999893567s
|
||||
2025/03/20 09:22:20 AFHDpDYV: acquire: vZrMwEsy lease expired -1.657500925s
|
||||
2025/03/20 09:22:20 AFHDpDYV: acquire success 4 l 2.999596975s
|
||||
2025/03/20 09:22:22 AFHDpDYV: Release 6
|
||||
2025/03/20 09:22:22 vZrMwEsy: refresher: exit expired -3.627083489s
|
||||
2025/03/20 09:22:23 tserHLNb: acquire success 7 l 2.999932478s
|
||||
2025/03/20 09:22:29 msIfUgIC: acquire: tserHLNb lease expired -1.13789373s
|
||||
2025/03/20 09:22:29 msIfUgIC: acquire success 10 l 2.999755401s
|
||||
2025/03/20 09:22:31 msIfUgIC: Release 12
|
||||
2025/03/20 09:22:31 tserHLNb: refresher: exit expired -3.083945752s
|
||||
2025/03/20 09:22:32 YLEIZyDn: acquire success 13 l 2.999940475s
|
||||
2025/03/20 09:22:38 TIibzsMc: acquire: YLEIZyDn lease expired -1.017825561s
|
||||
2025/03/20 09:22:38 TIibzsMc: acquire success 16 l 2.999907075s
|
||||
2025/03/20 09:22:40 TIibzsMc: Release 18
|
||||
2025/03/20 09:22:40 YLEIZyDn: refresher: exit expired -2.789136907s
|
||||
2025/03/20 09:22:41 knOnYtxW: acquire success 19 l 2.999891429s
|
||||
2025/03/20 09:22:47 KyiPMsgB: acquire: knOnYtxW lease expired -1.534324297s
|
||||
2025/03/20 09:22:47 KyiPMsgB: acquire success 22 l 2.999822725s
|
||||
2025/03/20 09:22:49 KyiPMsgB: Release 24
|
||||
2025/03/20 09:22:49 knOnYtxW: refresher: exit expired -3.516354686s
|
||||
2025/03/20 09:22:50 wHNCImkl: acquire success 25 l 2.999917928s
|
||||
2025/03/20 09:22:56 CSBcxnyr: acquire: wHNCImkl lease expired -1.051161379s
|
||||
2025/03/20 09:22:56 CSBcxnyr: acquire success 28 l 2.999745303s
|
||||
2025/03/20 09:22:58 CSBcxnyr: Release 31
|
||||
2025/03/20 09:22:58 wHNCImkl: refresher: exit expired -3.241024197s
|
||||
... Passed -- 60.1 1 15934 5124
|
||||
--- PASS: TestPartitionRecoveryReliableClerks5C (60.14s)
|
||||
=== RUN TestPartitionRecoveryUnreliableClerks5C
|
||||
Test (5C): controllers with leased leadership ... (unreliable network)...
|
||||
2025/03/20 09:23:14 ydfNYYir: acquire success 1 l 2.871807366s
|
||||
2025/03/20 09:23:21 KmfOaYym: acquire: ydfNYYir lease expired -1.96910688s
|
||||
2025/03/20 09:23:21 KmfOaYym: acquire success 4 l 2.976357121s
|
||||
2025/03/20 09:23:25 KmfOaYym: Release 9
|
||||
2025/03/20 09:23:26 ydfNYYir: refresher: exit expired -6.960801287s
|
||||
2025/03/20 09:23:27 XErxjiqb: acquire success 10 l 2.994288153s
|
||||
2025/03/20 09:23:34 VQFBAKED: acquire: XErxjiqb lease expired -1.186993995s
|
||||
2025/03/20 09:23:34 VQFBAKED: acquire success 14 l 2.978008397s
|
||||
2025/03/20 09:23:40 VQFBAKED: Release 20
|
||||
2025/03/20 09:23:40 XErxjiqb: refresher: exit expired -7.422563867s
|
||||
2025/03/20 09:23:41 IqJHVjsW: acquire success 21 l 2.984528802s
|
||||
2025/03/20 09:23:47 NAaIOMcb: acquire: IqJHVjsW lease expired -1.19246442s
|
||||
2025/03/20 09:23:48 NAaIOMcb: acquire success 25 l 2.521727902s
|
||||
2025/03/20 09:23:53 NAaIOMcb: Release 30
|
||||
2025/03/20 09:23:53 IqJHVjsW: refresher: exit expired -7.130118022s
|
||||
2025/03/20 09:23:54 pwTkolYO: acquire success 32 l 2.761741697s
|
||||
2025/03/20 09:24:01 GAueeCFX: acquire: pwTkolYO lease expired -1.496813006s
|
||||
2025/03/20 09:24:01 GAueeCFX: acquire success 34 l 2.977558093s
|
||||
2025/03/20 09:24:06 GAueeCFX: Release 39
|
||||
2025/03/20 09:24:06 pwTkolYO: refresher: exit expired -6.843949894s
|
||||
2025/03/20 09:24:09 FIDtQSlF: acquire: GAueeCFX lease expired -15.508321ms
|
||||
2025/03/20 09:24:09 FIDtQSlF: acquire success 40 l 2.998464382s
|
||||
2025/03/20 09:24:16 wExaLSov: acquire: FIDtQSlF lease expired -1.874162487s
|
||||
2025/03/20 09:24:16 wExaLSov: acquire success 43 l 2.860519358s
|
||||
2025/03/20 09:24:21 wExaLSov: Release 48
|
||||
2025/03/20 09:24:21 FIDtQSlF: refresher: exit expired -6.924846198s
|
||||
... Passed -- 68.4 1 10469 1186
|
||||
--- PASS: TestPartitionRecoveryUnreliableClerks5C (68.39s)
|
||||
PASS
|
||||
ok 6.5840/shardkv1 351.349s
|
||||
@ -138,7 +138,7 @@ func (cfg *Config) End() {
|
||||
ops := atomic.LoadInt32(&cfg.ops) // number of clerk get/put/append calls
|
||||
|
||||
fmt.Printf(" ... Passed --")
|
||||
fmt.Printf(" %4.1f %d %5d %4d\n", t, npeers, nrpc, ops)
|
||||
fmt.Printf(" time %4.1fs #peers %d #RPCs %5d #Ops %4d\n", t, npeers, nrpc, ops)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -299,19 +299,6 @@ func (sg *ServerGrp) MakePartition(l int) ([]int, []int) {
|
||||
return p1, p2
|
||||
}
|
||||
|
||||
func (sg *ServerGrp) AllowServersExcept(l int) []int {
|
||||
n := len(sg.srvs) - 1
|
||||
p := make([]int, n)
|
||||
j := 0
|
||||
for i, _ := range sg.srvs {
|
||||
if i != l {
|
||||
p[j] = i
|
||||
j++
|
||||
}
|
||||
}
|
||||
return p
|
||||
}
|
||||
|
||||
func (sg *ServerGrp) Partition(p1 []int, p2 []int) {
|
||||
//log.Printf("partition servers into: %v %v\n", p1, p2)
|
||||
for i := 0; i < len(p1); i++ {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user