6.5840/src/mr/worker.go

189 lines
4.1 KiB
Go

package mr
import "fmt"
import "log"
import "net/rpc"
import "hash/fnv"
import "os"
import "io"
import "encoding/json"
//
// Map functions return a slice of KeyValue.
//
type KeyValue struct {
Key string
Value string
}
//
// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
//
func ihash(key string) int {
h := fnv.New32a()
h.Write([]byte(key))
return int(h.Sum32() & 0x7fffffff)
}
var id int
//
// main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) {
// Your worker implementation here.
registerReply := Register()
id = registerReply.WorkerId
done := false
for !done {
reply := GetJob(GetJobArgs{id})
if reply.Finished {
done = true
continue
}
if reply.JobType == Map {
doMap(mapf, reply.MapJob)
} else if reply.JobType == Reduce {
doReduce(reducef, reply.ReduceJob)
}
}
// uncomment to send the Example RPC to the coordinator.
// CallExample()
}
func doMap(mapf func(string, string) []KeyValue, job MapJob) {
file, err := os.Open(job.InputFile)
if err != nil {
log.Fatalf("[Error] Could not open file %s", job.InputFile)
}
content, err := io.ReadAll(file)
if err != nil {
log.Fatalf("[Error] Could not read file %s", job.InputFile)
}
file.Close()
kva := mapf(job.InputFile, string(content))
sort.Sort(ByKey(kva))
partitions := make([][]KeyValue, job.ReducerCount)
for _, v := range kva {
i := ihash(v.Key) % job.ReducerCount
partitions[i] = append(partitions[i], v)
}
intermediateFiles := make([]string, job.ReducerCount)
for i, v := range partitions {
filename := fmt.Sprintf("mr-%v-%v.txt", job.Index, i)
file, err := os.Open(filename);
if err != nil {
log.Fatalf("[Error] Could not open file %s", filename)
}
enc := json.NewEncoder(file)
for _, kv := range v {
err := enc.Encode(&kv)
if err != nil {
log.Fatalf("[Error] Could not write to file %s", filename)
}
}
intermediateFiles[i] = filename
file.Close()
}
Finish(MapResult{job.InputFile, intermediateFiles, id})
}
//
// example function to show how to make an RPC call to the coordinator.
//
// the RPC argument and reply types are defined in rpc.go.
//
func CallExample() {
// declare an argument structure.
args := ExampleArgs{}
// fill in the argument(s).
args.X = 99
// declare a reply structure.
reply := ExampleReply{}
// send the RPC request, wait for the reply.
// the "Coordinator.Example" tells the
// receiving server that we'd like to call
// the Example() method of struct Coordinator.
ok := call("Coordinator.Example", &args, &reply)
if ok {
// reply.Y should be 100.
fmt.Printf("reply.Y %v\n", reply.Y)
} else {
fmt.Printf("call failed!\n")
}
call("Coordinatior.Wrong", &args, &reply);
}
func Register() RegisterReply {
var args RegisterArgs
var reply RegisterReply
call("Coordinator.Register", &args, &reply)
return reply
}
func GetJob(args GetJobArgs) GetJobReply {
var reply GetJobReply
call("Coordinator.GetJob", &args, &reply)
return reply
}
func Finish(args MapResult) FinishReply {
var reply FinishReply
call("Coordinator.Finish", &args, &reply)
return reply
}
//
// send an RPC request to the coordinator, wait for the response.
// usually returns true.
// returns false if something goes wrong.
//
func call(rpcname string, args interface{}, reply interface{}) bool {
// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
sockname := coordinatorSock()
c, err := rpc.DialHTTP("unix", sockname)
if err != nil {
log.Fatal("dialing:", err)
}
defer c.Close()
err = c.Call(rpcname, args, reply)
if err == nil {
return true
}
fmt.Println(err)
return false
}