diff --git a/src/main/mrworker.go b/src/main/mrworker.go index 3e43139..9b80d81 100644 --- a/src/main/mrworker.go +++ b/src/main/mrworker.go @@ -32,7 +32,7 @@ func main() { func loadPlugin(filename string) (func(string, string) []mr.KeyValue, func(string, []string) string) { p, err := plugin.Open(filename) if err != nil { - log.Fatalf("cannot load plugin %v", filename) + log.Fatalf("cannot load plugin %v: %s", filename, err) } xmapf, err := p.Lookup("Map") if err != nil { diff --git a/src/mr/coordinator.go b/src/mr/coordinator.go index cafda57..b2365ce 100644 --- a/src/mr/coordinator.go +++ b/src/mr/coordinator.go @@ -5,11 +5,42 @@ import "net" import "os" import "net/rpc" import "net/http" +import "sync" +type status uint8 + +const ( + idle status = iota + progress + completed +) type Coordinator struct { - // Your definitions here. + // Your definitions here. + mapJobStatus map[string]status + reduceJobStatus map[string]status + workerStatus map[int]status + + intermediateFiles map[string][]string + + nReduce int + + workerCount int + + m sync.Mutex +} + +func (c *Coordinator) areMapJobsDone() bool { + done := true + + for _, job := range c.mapJobStatus { + if job != completed { + done = false + } + } + + return done } // Your code here -- RPC handlers for the worker to call. @@ -20,25 +51,73 @@ type Coordinator struct { // the RPC argument and reply types are defined in rpc.go. // func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error { - reply.Y = args.X + 1 - return nil + reply.Y = args.X + 1 + return nil } +func (c *Coordinator) Register(args *RegisterArgs, reply *RegisterReply) error { + c.m.Lock() + defer c.m.Unlock() + + reply.WorkerId = c.workerCount + c.workerCount += 1 + + c.workerStatus[reply.WorkerId] = idle + + return nil +} + +func (c *Coordinator) GetJob(args *GetJobArgs, reply *GetJobReply) error { + c.m.Lock() + c.m.Unlock() + + if !c.areMapJobsDone() { + for filename, jobStatus := range c.mapJobStatus { + if jobStatus == idle { + var job MapJob + job.InputFile = filename + job.ReducerCount = c.nReduce + c.mapJobStatus[filename] = progress + + reply.Finished = false + reply.JobType = Map + reply.MapJob = job + + c.workerStatus[args.WorkerId] = progress + + return nil + } + } + } + + return nil +} + +func (c *Coordinator) Finish(args *MapResult, reply *FinishReply) error { + c.m.Lock() + defer c.m.Unlock() + + c.intermediateFiles[args.InputFile] = args.IntermediateFiles + c.mapJobStatus[args.InputFile] = completed + c.workerStatus[args.WorkerId] = idle + + return nil +} // // start a thread that listens for RPCs from worker.go // func (c *Coordinator) server() { - rpc.Register(c) - rpc.HandleHTTP() - //l, e := net.Listen("tcp", ":1234") - sockname := coordinatorSock() - os.Remove(sockname) - l, e := net.Listen("unix", sockname) - if e != nil { - log.Fatal("listen error:", e) - } - go http.Serve(l, nil) + rpc.Register(c) + rpc.HandleHTTP() + //l, e := net.Listen("tcp", ":1234") + sockname := coordinatorSock() + os.Remove(sockname) + l, e := net.Listen("unix", sockname) + if e != nil { + log.Fatal("listen error:", e) + } + go http.Serve(l, nil) } // @@ -46,12 +125,12 @@ func (c *Coordinator) server() { // if the entire job has finished. // func (c *Coordinator) Done() bool { - ret := false + ret := false - // Your code here. + // Your code here. - return ret + return ret } // @@ -60,11 +139,19 @@ func (c *Coordinator) Done() bool { // nReduce is the number of reduce tasks to use. // func MakeCoordinator(files []string, nReduce int) *Coordinator { - c := Coordinator{} + c := Coordinator{} - // Your code here. + // Your code here. + c.mapJobStatus = make(map[string]status) + c.intermediateFiles = make(map[string][]string) + c.workerStatus = make(map[int]status) + c.nReduce = nReduce + for _, v := range files { + c.mapJobStatus[v] = idle + c.intermediateFiles[v] = make([]string, nReduce) + } - c.server() - return &c + c.server() + return &c } diff --git a/src/mr/rpc.go b/src/mr/rpc.go index 1f15466..47710a9 100644 --- a/src/mr/rpc.go +++ b/src/mr/rpc.go @@ -15,22 +15,70 @@ import "strconv" // type ExampleArgs struct { - X int + X int } type ExampleReply struct { - Y int + Y int } // Add your RPC definitions here. +type JobType uint8 +const ( + Map JobType = iota + Reduce +) + +type MapJob struct { + //Index int + InputFile string + ReducerCount int + //IntermediateFiles []string +} + +type ReduceJob struct { + ReducerNumber int + IntermediateFiles []string +} + +type MapResult struct { + InputFile string + IntermediateFiles []string + WorkerId int +} + +type RegisterArgs struct { + // empty +} + +type RegisterReply struct { + WorkerId int +} + +type GetJobArgs struct { + WorkerId int +} + +type GetJobReply struct { + JobType JobType + + MapJob MapJob + ReduceJob ReduceJob + + Finished bool +} + +type FinishReply struct { + +} // Cook up a unique-ish UNIX-domain socket name // in /var/tmp, for the coordinator. // Can't use the current directory since // Athena AFS doesn't support UNIX-domain sockets. func coordinatorSock() string { - s := "/var/tmp/5840-mr-" - s += strconv.Itoa(os.Getuid()) - return s + s := "/var/tmp/5840-mr-" + s += strconv.Itoa(os.Getuid()) + return s } diff --git a/src/mr/worker.go b/src/mr/worker.go index aaa8b64..4165868 100644 --- a/src/mr/worker.go +++ b/src/mr/worker.go @@ -4,14 +4,16 @@ import "fmt" import "log" import "net/rpc" import "hash/fnv" - +import "os" +import "io" +import "encoding/json" // // Map functions return a slice of KeyValue. // type KeyValue struct { - Key string - Value string + Key string + Value string } // @@ -19,23 +21,91 @@ type KeyValue struct { // task number for each KeyValue emitted by Map. // func ihash(key string) int { - h := fnv.New32a() - h.Write([]byte(key)) - return int(h.Sum32() & 0x7fffffff) + h := fnv.New32a() + h.Write([]byte(key)) + return int(h.Sum32() & 0x7fffffff) } +var id int // // main/mrworker.go calls this function. // -func Worker(mapf func(string, string) []KeyValue, - reducef func(string, []string) string) { +func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) { - // Your worker implementation here. + // Your worker implementation here. - // uncomment to send the Example RPC to the coordinator. - // CallExample() + registerReply := Register() + id = registerReply.WorkerId + done := false + + for !done { + reply := GetJob(GetJobArgs{id}) + + if reply.Finished { + done = true + continue + } + + if reply.JobType == Map { + doMap(mapf, reply.MapJob) + } else if reply.JobType == Reduce { + doReduce(reducef, reply.ReduceJob) + } + } + + // uncomment to send the Example RPC to the coordinator. + // CallExample() + +} + +func doMap(mapf func(string, string) []KeyValue, job MapJob) { + file, err := os.Open(job.InputFile) + if err != nil { + log.Fatalf("[Error] Could not open file %s", job.InputFile) + } + + content, err := io.ReadAll(file) + if err != nil { + log.Fatalf("[Error] Could not read file %s", job.InputFile) + } + file.Close() + + kva := mapf(job.InputFile, string(content)) + + sort.Sort(ByKey(kva)) + + partitions := make([][]KeyValue, job.ReducerCount) + + for _, v := range kva { + i := ihash(v.Key) % job.ReducerCount + partitions[i] = append(partitions[i], v) + } + + intermediateFiles := make([]string, job.ReducerCount) + + for i, v := range partitions { + filename := fmt.Sprintf("mr-%v-%v.txt", job.Index, i) + file, err := os.Open(filename); + if err != nil { + log.Fatalf("[Error] Could not open file %s", filename) + } + + enc := json.NewEncoder(file) + for _, kv := range v { + err := enc.Encode(&kv) + if err != nil { + log.Fatalf("[Error] Could not write to file %s", filename) + } + } + + intermediateFiles[i] = filename + + file.Close() + } + + Finish(MapResult{job.InputFile, intermediateFiles, id}) } // @@ -45,26 +115,53 @@ func Worker(mapf func(string, string) []KeyValue, // func CallExample() { - // declare an argument structure. - args := ExampleArgs{} + // declare an argument structure. + args := ExampleArgs{} - // fill in the argument(s). - args.X = 99 + // fill in the argument(s). + args.X = 99 - // declare a reply structure. - reply := ExampleReply{} + // declare a reply structure. + reply := ExampleReply{} - // send the RPC request, wait for the reply. - // the "Coordinator.Example" tells the - // receiving server that we'd like to call - // the Example() method of struct Coordinator. - ok := call("Coordinator.Example", &args, &reply) - if ok { - // reply.Y should be 100. - fmt.Printf("reply.Y %v\n", reply.Y) - } else { - fmt.Printf("call failed!\n") - } + // send the RPC request, wait for the reply. + // the "Coordinator.Example" tells the + // receiving server that we'd like to call + // the Example() method of struct Coordinator. + ok := call("Coordinator.Example", &args, &reply) + if ok { + // reply.Y should be 100. + fmt.Printf("reply.Y %v\n", reply.Y) + } else { + fmt.Printf("call failed!\n") + } + + call("Coordinatior.Wrong", &args, &reply); +} + +func Register() RegisterReply { + var args RegisterArgs + + var reply RegisterReply + + call("Coordinator.Register", &args, &reply) + return reply +} + +func GetJob(args GetJobArgs) GetJobReply { + var reply GetJobReply + + call("Coordinator.GetJob", &args, &reply) + + return reply +} + +func Finish(args MapResult) FinishReply { + var reply FinishReply + + call("Coordinator.Finish", &args, &reply) + + return reply } // @@ -73,19 +170,19 @@ func CallExample() { // returns false if something goes wrong. // func call(rpcname string, args interface{}, reply interface{}) bool { - // c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234") - sockname := coordinatorSock() - c, err := rpc.DialHTTP("unix", sockname) - if err != nil { - log.Fatal("dialing:", err) - } - defer c.Close() + // c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234") + sockname := coordinatorSock() + c, err := rpc.DialHTTP("unix", sockname) + if err != nil { + log.Fatal("dialing:", err) + } + defer c.Close() - err = c.Call(rpcname, args, reply) - if err == nil { - return true - } + err = c.Call(rpcname, args, reply) + if err == nil { + return true + } - fmt.Println(err) - return false + fmt.Println(err) + return false }