来自 电脑知识 2019-09-20 23:05 的文章
当前位置: 威尼斯国际官方网站 > 电脑知识 > 正文

【威尼斯国际官方网站】分布式系统

前言

Mit6.824 是作者在学习一些布满式系统方面包车型大巴知识的时候有的时候见到的,然后就起来尝试跟课。不得不说,海外的课程难度是真的大,七日的小时竟是要学一门 Go 语言,然后还要读散文,进而做MapReduce 实验。
鉴于 MQashqai(MapReduce) 框架要求创立在 DFS(Distributed File System)的底子上贯彻,所以本实验是由此选择多线程来模拟布满式境况。纵然难度上海大学大收缩,然则经过该试验,依旧会让我们对 M奥迪Q7 的为主原理有多少个比较深厚的认知。
做尝试在此以前大家需求先把精彩的 MapReduce 杂文给看了,窝比较提议直接看丹麦语原稿,但一旦时光不丰硕的话,能够平昔在英特网找普通话的翻译版。
刚开首做那么些试验的时候的确是叁只雾水,完全不了解如何出手。后来察觉这些工程有一个自动化测量检验文件(test_test.go),每部分实验都会动用那些测量试验文件里的函数对代码举行测量试验。大家若是本着那些测验函数稳步倒推,然后补全代码就可以。

介绍

通过 布满式系统体系小说,咱们询问了布满式的一对基本概念,假诺写点代码试行一下,那就更加好了。先做个轻易的试验练练手,还记得 MapReduce 吗?,本次实验中会营造多少个 MapReduce 库,紧要能熟稔 Go 语言外加通晓遍布式系统中的容错机制。首先写个二个粗略的 MapReduce 程序,再写一个 Master,它不仅可以分配任务给 worker 並且能管理 worker 试行错误。接口参谋诗歌描述。

Part I: Map/Reduce input and output

第一有的是先达成一个顺序版(sequential)的MXC90,让大家对 MLX570的流程有二个光景的认知,并且实现doMap()doReduce() 八个函数。
其满含七个测验函数TestSequentialSingle()TestSequentialMany()

尝试境况

不会令你从零最初撸代码啦,还不快 git clone ?

$ git clone git://g.csail.mit.edu/6.824-golabs-2016 6.824
$ cd 6.824
$ ls
Makefile src

MapReduce 代码协理顺序试行和遍布式试行。顺序实践代表 Map 先实施,当有着 Map 职务都做到了再进行Reduce,这种格局只怕功能比很低,然而正如便利调节和测量试验,终究串行。遍及式实践运维了成都百货上千 worker 线程,他们并行实施 Map 职务,然后实行 Reduce 职分,这种形式效能越来越高,当然更难落成和调剂。

TestSequentialSingle()

各样map worker管理四个文本,所以map worker的数据就约等于文件的数量。
测量检验单个map worker 和 reduce worker。

func TestSequentialSingle(t *testing.T) {
    mr := Sequential("test", makeInputs(1), 1, MapFunc, ReduceFunc)
    mr.Wait()
    check(t, mr.files)
    checkWorker(t, mr.stats)
    cleanup(mr)
}

预备:熟识代码

mapreduce 包提供了一个回顾的 MapReduce 顺序施行落到实处。应用只要调用 Distributed() 方法就能够运维三个任务,可是要调度的时候也许需求调用 Sequential().

mapreduce 的周转流程如下:

  1. 应用层要求提供输入文件,贰个 map 函数,贰个 reduce 函数,要启动reduce 职务的多少。

  2. 用这一个参数创设一个 master。它会运行二个 RPC 服务器(master_rpc.go),然后等待 worker 注册(Register())。当有待产生的职分时,schedule() 就能将职务分配给 worker,同一时候也会议及展览开 worker 的错误管理。

  3. master 感觉每一个输入文件应该交付叁个 map 职务管理,然后调用 doMap(),无论直接调用 Sequential() 照旧经过 RPC 给 worker 发送 DoTask 音信都会接触那个操作。每当调用 doMap() 时,它都会去读取相应的文件,以文件内容调用 map 函数并且为每种输入文件发出 nReduce 个文本。由此,每一种 map 义务最后会时有发生 #files x nReduce 个文件。

  4. master 接下来会对各类 reduce 职务至少调用一次 doReduce()doReduce() 首先会收罗 nReduce 个 map 任务发生的文本,然后在各种文件上推行 reduce 函数,最后发生二个结出文件。

  5. master 会调用 mr.merge() 方法将上一步发生负有结果文件聚合到多少个文书中。

故而此番实验就是到填空题,空是:doMap, doReduce,schedule 和 reduce。

任何的艺术基本无需转移,不经常光的研究钻探推进明白全部架构。

TestSequentialMany()

此测验函数测量检验多个 map worker 和五个 reduce worker。
其运作逻辑和TestSequentialSingle类似。

func TestSequentialMany(t *testing.T) {
    mr := Sequential("test", makeInputs(5), 3, MapFunc, ReduceFunc)
    mr.Wait()
    check(t, mr.files)
    checkWorker(t, mr.stats)
    cleanup(mr)
}

Part I: Map/Reduce 输入和出口

率先个空 doMap() 函数的功力是读取钦赐文件的内容,施行 mapF 函数,将结果保存在新的文书中;而 doReuce() 读取 doMap 的输出文件,实践 reduceF 函数,将结果存在磁盘中。

写完了就测量试验测量试验,测量试验文件(test_test.go)已经写好了。串行方式测量试验可实行:

$ cd 6.824
$ export "GOPATH=$PWD"  
$ cd "$GOPATH/src/mapreduce"
$ setup ggo_v1.5
$ go test -run Sequential mapreduce/...
ok      mapreduce   2.694s

举个例子您看到的不是 ok,表达还会有 bug 哦。在 common.go 将 debugEnbale 设置成 true,然后运维 go test -run Sequential mapreduce/... -v,能够看来更详实的出口:

$ env "GOPATH=$PWD/../../" go test -v -run Sequential mapreduce/...
=== RUN   TestSequentialSingle
master: Starting Map/Reduce task test
Merge: read mrtmp.test-res-0
master: Map/Reduce task completed
--- PASS: TestSequentialSingle (1.34s)
=== RUN   TestSequentialMany
master: Starting Map/Reduce task test
Merge: read mrtmp.test-res-0
Merge: read mrtmp.test-res-1
Merge: read mrtmp.test-res-2
master: Map/Reduce task completed
--- PASS: TestSequentialMany (1.33s)
PASS
ok      mapreduce   2.672s

Sequential()

测量试验函数将工作名称,测试文件,reduce 的数量,用户定义的 map 函数,reduce 函数三个实参传递给Sequential()

// Sequential runs map and reduce tasks sequentially, waiting for each task to
// complete before running the next.
func Sequential(jobName string, files []string, nreduce int,
    mapF func(string, string) []KeyValue,
    reduceF func(string, []string) string,
) (mr *Master) {
    mr = newMaster("master")
    go mr.run(jobName, files, nreduce, func(phase jobPhase) {
        switch phase {
        case mapPhase:
            for i, f := range mr.files {
                doMap(mr.jobName, i, f, mr.nReduce, mapF)
            }
        case reducePhase:
            for i := 0; i < mr.nReduce; i++ {
                doReduce(mr.jobName, i, mergeName(mr.jobName, i), len(mr.files), reduceF)
            }
        }
    }, func() {
        mr.stats = []int{len(files) + nreduce}
    })
    return
}

Sequential()第一获得二个Master 对象的指针,然后利用函数闭包运转Master.run()

Part II: 单机词频总计

形成了第一部分,大家可以伊始营造友好首先个 MapReduce 系统:词频总计器。没有错还是填空题:mapF 和 reduceF,让 wc.go 能够总结出各个单词出现的次数。大家的测验文件之中独有日语,所以二个单词就是三翻五次出现字母,决断二个假名仿照效法标准库 unicode.IsLetter

测量检验文件是 6.824/src/main/pg-*.txt,不要紧先编写翻译试试:

$ cd 6.824
$ export "GOPATH=$PWD"
$ cd "$GOPATH/src/main"
$ go run wc.go master sequential pg-*.txt
# command-line-arguments
./wc.go:14: missing return at end of function
./wc.go:21: missing return at end of function

本来通过持续,终归空还没填呢。mapF 的参数是测验文件名和其内容,分割成单词,重返 []mapreduce.KeyValue,KeyValue:单词-频次。轮到 reduceF 函数了,它会指向每一种 key(单词) 调用一回,参数是有些单词以及这些单词在装有测验文件中的 mapF 结果。

写好了,便可测验:

$ cd "$GOPATH/src/main"
$ time go run wc.go master sequential pg-*.txt
master: Starting Map/Reduce task wcseq
Merge: read mrtmp.wcseq-res-0
Merge: read mrtmp.wcseq-res-1
Merge: read mrtmp.wcseq-res-2
master: Map/Reduce task completed
14.59user 3.78system 0:14.81elapsed

末段的结果保存在 mrtmp.wcseq 文件中。运营 $ rm mrtmp.* 删除全部的中间数据文件。

运行 sort -n -k2 mrtmp.wcseq | tail -10,如若见到的和底下的同等,表达您写对了。

$ 
he: 34077
was: 37044
that: 37495
I: 44502
in: 46092
a: 60558
to: 74357
of: 79727
and: 93990
the: 154024

可见直接运转 $sh ./test-wc.sh

小提示: strings.FieldFunc 能够将贰个 string 分割成多个部分,strconv 包中有函数可将 string 调换来 int。

Master.run()

// run executes a mapreduce job on the given number of mappers and reducers.
//
// First, it divides up the input file among the given number of mappers, and
// schedules each task on workers as they become available. Each map task bins
// its output in a number of bins equal to the given number of reduce tasks.
// Once all the mappers have finished, workers are assigned reduce tasks.
//
// When all tasks have been completed, the reducer outputs are merged,
// statistics are collected, and the master is shut down.
//
// Note that this implementation assumes a shared file system.
func (mr *Master) run(jobName string, files []string, nreduce int,
    schedule func(phase jobPhase),
    finish func(),
) {
    mr.jobName = jobName
    mr.files = files
    mr.nReduce = nreduce

    fmt.Printf("%s: Starting Map/Reduce task %sn", mr.address, mr.jobName)

    schedule(mapPhase)
    schedule(reducePhase)
    finish()
    mr.merge()

    fmt.Printf("%s: Map/Reduce task completedn", mr.address)

    mr.doneChannel <- true
}

Part III: 分布式 MapReduce

MapReduce 让开荒者最爽的地点是不需求关注代码是在多台机器并行推行的。但大家今日的落到实处是 master 把 map 和 reduce 任务一个二个进行。即便这种完毕情势概念上很简短,不过质量并不是相当高。接下来大家来促成一个并发的 MapReduce,它会调用四个 worker 线程去实行职务,那样能够越来越好地使用多核CPU。当然我们的实验不是真署在多台机械上而是用 channel 去模拟分布式总括。

鉴于是出新,所以需求调解者 master 线程,它承担给 worker 分发职责,何况直接等待直到全部 worker 完毕任务。为了让大家的试验尤其真实,master 只好通过 RPC 的办法与 worker 通信。worker 代码(mapreduce/worker.go)已经筹算好了,它用于运行 worker。

下叁个空是 schedule.go 中的 schedule(),那几个措施担任给 worker 分发 map 和 reduce 任务,当全数职责到位后回去。

master.go 中的 run() 方法会先调用 schedule(),然后调用 merge() 把每种 reduce 职责的出口文件整合到二个文书之中。schedule 只需求报告 worker 输入文件的名字 (mr.files[task]) 和职责 task,worker 自身了然从何地读取也明白把结果写到哪个文件之中。master 通过 RPC 调用 Worker.DoTask 通告 worker 开始新职分,同不时间还有只怕会在 RPC 参数中包括二个 DoTaskArgs 对象。

当八个 worker 希图截止能够干活时,它会向 master 发送贰个 Register RPC,注册的还要还有大概会把那么些 worker 的有关音信放入 mr.registerChannel。所以 schedule 应该经过读取那些 channel 管理新 worker 的登记。

眼前正在运作的 job 音信都在 Master 中定义。注意,master 不需求通晓 Map 或 Reduce 具体进行的是怎样代码;当贰个 worker 被 wc.go 创设时就曾经教导了 Map 和 Reduce 函数的音信。

运行 $ go test -run TestBasic mapreduce/... 可进展基础测验。

小提醒: master 应该相互的发送 RPC 给 worker,那样 worker 能够并发推行义务。可参看 Go RPC 文档。

小提示: master 应该等四个 worker 完毕当前职责后即时为它分配一个新职责。等待 master 响应的线程能够用 channel 作为共同工具。Concurrency in Go 有详尽的 channel 用法。

小提示: 追踪 bug 最简便的方法正是在代码加入 debug(),然后实践 go test -run TestBasic mapreduce/... > out,out 就能够含有调节和测量检验音信。最要紧的切磋你原感到的输出和真正的出口为何区别。

注:当前的代码试运作在三个 Unix 进度中,并且它亦可利用一台机械的多核。要是是要布局在多台机器上,则要修改代码让 worker 通过 TCP 实际不是 Unix-domain sockets 通信。别的还要求贰个网络文件系统分享存款和储蓄。

doMap()

doMap()doReduce()是亟需我们去贯彻的函数。
doMap()的贯彻首假若将客商定义的MapFunc()切割的文书,通过 hash 分到 'nReduce'个切丝中去。

func doMap(
    jobName string, // the name of the MapReduce job
    mapTaskNumber int, // which map task this is
    inFile string,
    nReduce int, // the number of reduce task that will be run ("R" in the paper)
    mapF func(file string, contents string) []KeyValue,
) {
    // read contents from 'infile'
    dat,err := ioutil.ReadFile(inFile)
    if err != nil {
        log.Fatal("doMap: readFile ", err)
    }

    //transfer data into ‘kvSlice’ according to the mapF()
    kvSlice := mapF(inFile, string(dat))

    //divide the ‘kvSlice’ into 'reduceKv' according to the ihash()
    var reduceKv [][]KeyValue // temporary variable which will be written into reduce files
    for i:=0;i<nReduce;i++ {
        s1 := make([]KeyValue,0)
        reduceKv = append(reduceKv, s1)
    }
    for _,kv := range kvSlice{
        hash := ihash(kv.Key) % nReduce
        reduceKv[hash] = append(reduceKv[hash],kv)
    }

    //write 'reduceKv' into ‘nReduce’ JSON files
    for i := 0;i<nReduce;i++ {
        file,err := os.Create(reduceName(jobName,mapTaskNumber,i))
        if err != nil {
            log.Fatal("doMap: create ", err)
        }

        enc := json.NewEncoder(file)
        for _, kv := range reduceKv[i]{
            err := enc.Encode(&kv)
            if err != nil {
                log.Fatal("doMap: json encodem ", err)
            }
        }

        file.Close()

    }
}

Part IV: 管理 worker 实践错误

本小节要让您的 master 能够管理职分实行倒闭的 worker。由于 MapReduce 中 worker 并没有长久状态,所以拍卖起来相对轻巧。假设叁个 worker 试行破产了,master 向 worker 发送的别样三个 RPC 都或然停业,比方超时。由此,要是退步,master 应该把那个职责指派给另为三个worker。

二个 RPC 失利并不一定代表 worker 退步,有望是某些 worker 平常运维但 master 不或者赢获得它的新闻。所以或然会出七个 worker 同失常间实行同贰个职务。但是因为各种职分都以幂等的,贰个职分被试行四回是没啥影响。

咱俩只要它不会倒闭,所以无需管理 master 退步的事态。让 master 能够容错是相持困苦的,因为它保持着水滴石穿的状态,当它失利后我们必要还原它的景况以保障它能够一而再工作。

test_test.go 还剩最终八个测量试验。测有贰个 worker 失利的图景和有广大 worker 退步的图景。运转可测量试验:$ go test -run Failure mapreduce/...

doReduce()

doReduce()首若是将 key 值一样的 value 打包发送给顾客定义的 ReduceFunc(),获得多个新的 kv对,key 值不改变,而value值则是ReduceFunc()的重返值,排序,最终将新的 kv对 切成片写入文件。

type ByKey []KeyValue
func (a ByKey) Len() int { return len(a) }
func (a ByKey) Swap(i, j int) { a[i],a[j] = a[j],a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }

func doReduce(
    jobName string, // the name of the whole MapReduce job
    reduceTaskNumber int, // which reduce task this is
    outFile string, // write the output here
    nMap int, // the number of map tasks that were run ("M" in the paper)
    reduceF func(key string, values []string) string,
) {
    //read kv slice from the json file
    var kvSlice []KeyValue
    for i := 0;i<nMap;i++{
        //file, _ := os.OpenFile(reduceName(jobName,i,reduceTaskNumber), os.O_RDONLY, 0666)
        file,err := os.Open(reduceName(jobName,i,reduceTaskNumber))
        if err != nil {
            log.Fatal("doReduce: open ", err)
        }
        var kv KeyValue
        dec := json.NewDecoder(file)
        for{
            err := dec.Decode(&kv)
            kvSlice = append(kvSlice,kv)
            if err == io.EOF {
                break
            }
        }
        file.Close()
        /********/
        //此处如果用 defer,可能会造成文件开启过多,造成程序崩溃
        /********/
    }

    //sort the intermediate kv slices by key
    sort.Sort(ByKey(kvSlice))

    //process kv slices in the reduceF()
    var reduceFValue []string
    var outputKv []KeyValue
    var preKey string = kvSlice[0].Key
    for i,kv := range kvSlice{
        if i == (len(kvSlice) - 1) {
            reduceFValue = append(reduceFValue, kv.Value)
            outputKv = append(outputKv, KeyValue{preKey, reduceF(preKey, reduceFValue)})
        } else {
                if kv.Key != preKey {
                    outputKv = append(outputKv, KeyValue{preKey, reduceF(preKey, reduceFValue)})
                    reduceFValue = make([]string, 0)
                }
                reduceFValue = append(reduceFValue, kv.Value)
        }

        preKey = kv.Key
    }

    //write the reduce output as JSON encoded kv objects to the file named outFile
    file,err := os.Create(outFile)
    if err != nil {
        log.Fatal("doRuduce: create ", err)
    }
    defer file.Close()

    enc := json.NewEncoder(file)
    for _, kv := range outputKv{
        err := enc.Encode(&kv)
        if err != nil {
            log.Fatal("doRuduce: json encode ", err)
        }
    }
}

Part V: 反向索引(可选)

挑战性:

词频总计固然是 MapReduce 最优良的二个用到,不过在大范围数据应用不日常用。试试写个反向索引应用。

反向索引在管理器科学中运用大范围,特别在文书档案寻觅世界中极其实用。一般的话,多个反向索引正是贰个从数据到多少特征的映照。例如,在文书档案搜索中,那个映射可能正是非同常常词与文书档案名称的照耀。

main/ii.go 的全体布局跟 wc.go 相似。修改 mapF 和 reduceF 让它们成立反向索引。运维 ii.go 应该出口贰个元组列表,每一行的格式如下:

$ go run ii.go master sequential pg-*.txt
$ head -n5 mrtmp.iiseq
A: 16 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
ABC: 2 pg-les_miserables.txt,pg-war_and_peace.txt
ABOUT: 2 pg-moby_dick.txt,pg-tom_sawyer.txt
ABRAHAM: 1 pg-dracula.txt
ABSOLUTE: 1 pg-les_miserables.txt

你的代码应该通过 test-ii.sh 的测量检验:

$ sort -k1,1 mrtmp.iiseq | sort -snk2,2 mrtmp.iiseq | grep -v '16' | tail -10
women: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
won: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
wonderful: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
words: 15 pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
worked: 15 pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
worse: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
wounded: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
yes: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
younger: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
yours: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt

Part II: Single-worker word count

第二有个别是兑现mapF()reduceF()函数,来促成通过逐条M兰德途睿欧总计词频的功力。
比较轻便,就直接放代码了。

func mapF(filename string, contents string) []mapreduce.KeyValue {
    f := func(c rune) bool {
        return !unicode.IsLetter(c)
    }
    var strSlice []string = strings.FieldsFunc(contents,f)
    var kvSlice []mapreduce.KeyValue
    for _,str := range strSlice {
        kvSlice = append(kvSlice, mapreduce.KeyValue{str, "1"})
    }

    return kvSlice
}

func reduceF(key string, values []string) string {
    var cnt int64
    for _,str := range values{
        temp,err := strconv.ParseInt(str,10,64)
        if(err != nil){
            fmt.Println("wc :parseint ",err)
        }
        cnt += temp
    }
    return strconv.FormatInt(cnt,10)
}

由此全方位测量试验

运作 src/main/test-mr.sh 可测量检验本次试验的全部情节。假若全勤因而,能够观察:

$ sh ./test-mr.sh
==> Part I
ok      mapreduce   3.053s

==> Part II
Passed test

==> Part III
ok      mapreduce   1.851s

==> Part IV
ok      mapreduce   10.650s

==> Part V (challenge)
Passed test

Part III: Distributing MapReduce tasks && Part IV: Handling worker failures

其三部分和第四部分能够协同来做,重借使落成schedule(),达成贰个透过线程并发推行map worker 和 reduce worker 的 M昂科拉 框架。框架通过 RPC 来效仿遍布式计算,并要带有 worker 的容灾功用。

本文由威尼斯国际官方网站发布于电脑知识,转载请注明出处:【威尼斯国际官方网站】分布式系统

关键词: