86 lines
2.5 KiB
Go
86 lines
2.5 KiB
Go
package rsm
|
|
|
|
import (
|
|
"sync"
|
|
|
|
"6.5840/kvsrv1/rpc"
|
|
"6.5840/labrpc"
|
|
"6.5840/raft1"
|
|
"6.5840/tester1"
|
|
|
|
)
|
|
|
|
type Op struct {
|
|
// Your definitions here.
|
|
// Field names must start with capital letters,
|
|
// otherwise RPC will break.
|
|
}
|
|
|
|
|
|
// A server (i.e., ../server.go) that wants to replicate itself calls
|
|
// MakeRSM and must implement the StateMachine interface. This
|
|
// interface allows the rsm package to interact with the server for
|
|
// server-specific operations: the server must implement DoOp to
|
|
// execute an operation (e.g., a Get or Put request), and
|
|
// Snapshot/Restore to snapshot and restore the server's state.
|
|
type StateMachine interface {
|
|
DoOp(any) any
|
|
Snapshot() []byte
|
|
Restore([]byte)
|
|
}
|
|
|
|
type RSM struct {
|
|
mu sync.Mutex
|
|
me int
|
|
rf *raft.Raft
|
|
applyCh chan raft.ApplyMsg
|
|
maxraftstate int // snapshot if log grows this big
|
|
sm StateMachine
|
|
// Your definitions here.
|
|
}
|
|
|
|
// servers[] contains the ports of the set of
|
|
// servers that will cooperate via Raft to
|
|
// form the fault-tolerant key/value service.
|
|
// me is the index of the current server in servers[].
|
|
// the k/v server should store snapshots through the underlying Raft
|
|
// implementation, which should call persister.SaveStateAndSnapshot() to
|
|
// atomically save the Raft state along with the snapshot.
|
|
// The RSM should snapshot when Raft's saved state exceeds maxraftstate bytes,
|
|
// in order to allow Raft to garbage-collect its log. if maxraftstate is -1,
|
|
// you don't need to snapshot.
|
|
//
|
|
// MakeRSM() must return quickly, so it should start goroutines for
|
|
// any long-running work.
|
|
func MakeRSM(servers []*labrpc.ClientEnd, me int, persister *tester.Persister, maxraftstate int, sm StateMachine) *RSM {
|
|
rsm := &RSM{
|
|
me: me,
|
|
maxraftstate: maxraftstate,
|
|
applyCh: make(chan raft.ApplyMsg),
|
|
sm: sm,
|
|
}
|
|
rsm.rf = raft.Make(servers, me, persister, rsm.applyCh)
|
|
return rsm
|
|
}
|
|
|
|
func (rsm *RSM) Raft() *raft.Raft {
|
|
return rsm.rf
|
|
}
|
|
|
|
|
|
// submit a command to Raft,
|
|
// and wait for it to be committed.
|
|
//
|
|
// returns (executeError, executeResult)
|
|
// if executeError==ErrWrongLeader, client should find new leader
|
|
// and try again.
|
|
func (rsm *RSM) Submit(req any) (rpc.Err, any) {
|
|
|
|
// Submit creates an Op structure to run a command through Raft;
|
|
// for example: op := Op{Id: rsm.nextId, Req: req}, where req is
|
|
// the argument to Submit and rsm.nextId a unique id for the op.
|
|
|
|
// your code here
|
|
return rpc.ErrWrongLeader, nil // i'm dead, try another server.
|
|
}
|