Main Content

MapReduce 快速入门

随着数据采集设备的数量和类型逐年增加,所收集的绝对数据大小和速率也在快速增长。这些大型数据集可能包含数 GB 或数 TB 的数据,并且可能以每天 MB 或 GB 的数量级增长。收集这类信息不仅提供了获取深入见解的机会,也带来了许多挑战。大多数算法的设计无法在合理的时间内或使用合理的内存量处理大型数据集。利用 MapReduce,您可以应对从大型数据集获取重要见解时所面临的诸多挑战。

什么是 MapReduce?

MapReduce 是一种用于分析无法放入内存的数据集的编程方法。您可能很熟悉 Hadoop® MapReduce,它是一种用于 Hadoop 分布式文件系统 (HDFS™) 的常用实现。MATLAB® 使用 mapreduce 函数提供了一种略有不同的 MapReduce 方法实现。

mapreduce 使用数据存储,基于可分别放入内存的小数据块来处理数据。每个数据块会经历映射阶段,此阶段对要处理的数据进行格式化。之后,中间数据块将经历化简 (Reduce) 阶段,此阶段对中间结果进行聚合,以生成最终结果。映射和化简阶段使用 mapreduce 函数进行编码,这些函数是 mapreduce 的主要输入。map 函数和 reduce 函数有无限多种用于处理数据的组合,因此该方法不仅灵活,而且非常强大,可用于处理大型数据处理任务。

mapreduce 支持自我扩展,以便在多种环境中运行。有关这些功能的详细信息,请参阅Speed Up and Deploy MapReduce Using Other Products

mapreduce 函数的功用在于它能对大型数据集合执行计算。因此,mapreduce 不太适合对正常大小的数据集执行计算,这类数据集可直接加载到计算机内存并使用传统方法进行分析。应将 mapreduce 用于对无法放入内存的数据集执行统计或解析计算。

mapreduce 每次调用的 map 或 reduce 函数都是独立于所有其他函数的。例如,对 map 函数的调用不能依赖于上一次 map 函数调用的输入或结果。最好将此类计算分解为多次 mapreduce 调用。

MapReduce 算法阶段

在到达最终输出之前,mapreduce 会移动输入数据存储中的各个数据块,使其经历多个阶段。下图概述了 mapreduce 的算法阶段。

Illustration of MapReduce algorithm phases: input datastore, map phase, intermediate sorting phase, reduce phase, and output datastore.

该算法包含以下步骤:

  1. mapreduce 使用 [data,info] = read(ds) 从输入数据存储读取数据块,然后调用 map 函数处理该数据块。

  2. map 函数接收数据块,组织数据块或执行前驱计算,然后使用 addaddmulti 函数将键-值对组添加到名为 KeyValueStore 的中间数据存储对象。mapreduce 对 map 函数的调用次数等于输入数据存储中的数据块数目。

  3. map 函数处理完数据存储中的所有数据块后,mapreduce 按照唯一键对中间 KeyValueStore 对象中的所有值进行分组。

  4. 接下来,mapreduce 针对 map 函数添加的每个唯一键调用一次 reduce 函数。每个唯一键可以有多个关联的值。mapreduce 将这些值以 ValueIterator 对象(用于循环访问这些值)的形式传递给 reduce 函数。每个唯一键的 ValueIterator 对象包含了该键的所有关联值。

  5. reduce 函数使用 hasnextgetnext 函数,逐一遍历 ValueIterator 对象中的值。然后,在聚合 map 函数的所有中间结果后,reduce 函数使用 addaddmulti 函数将最终的键-值对组添加到输出。输出中的键顺序与 reduce 函数将其添加到最终 KeyValueStore 对象的顺序相同。即,mapreduce 不会显式对输出进行排序。

    注意

    reduce 函数将最终键-值对组写入到最终 KeyValueStore 对象。mapreduce 将键-值对组从该对象拉入输出数据存储(默认为 KeyValueDatastore 对象)。

MapReduce 计算示例

以下示例使用一项简单的计算(某航班数据集中的平均航程)说明运行 mapreduce 所需的步骤。

准备数据

使用 mapreduce 的第一步是为数据集构造数据存储。数据集的数据存储与 map 和 reduce 函数一样,都是 mapreduce 的必要输入,因为 mapreduce 需要利用数据存储来处理数据块中的数据。

mapreduce 可处理大多数数据存储类型。例如,为 airlinesmall.csv 数据集创建一个 TabularTextDatastore 对象。

ds = tabularTextDatastore('airlinesmall.csv','TreatAsMissing','NA')
ds = 

  TabularTextDatastore with properties:

                      Files: {
                             ' ...\matlab\toolbox\matlab\demos\airlinesmall.csv'
                             }
                    Folders: {
                             ' ...\matlab\toolbox\matlab\demos'
                             }
               FileEncoding: 'UTF-8'
   AlternateFileSystemRoots: {}
      PreserveVariableNames: false
          ReadVariableNames: true
              VariableNames: {'Year', 'Month', 'DayofMonth' ... and 26 more}
             DatetimeLocale: en_US

  Text Format Properties:
             NumHeaderLines: 0
                  Delimiter: ','
               RowDelimiter: '\r\n'
             TreatAsMissing: 'NA'
               MissingValue: NaN

  Advanced Text Format Properties:
            TextscanFormats: {'%f', '%f', '%f' ... and 26 more}
                   TextType: 'char'
         ExponentCharacters: 'eEdD'
               CommentStyle: ''
                 Whitespace: ' \b\t'
    MultipleDelimitersAsOne: false

  Properties that control the table returned by preview, read, readall:
      SelectedVariableNames: {'Year', 'Month', 'DayofMonth' ... and 26 more}
            SelectedFormats: {'%f', '%f', '%f' ... and 26 more}
                   ReadSize: 20000 rows
                 OutputType: 'table'
                   RowTimes: []

  Write-specific Properties:
     SupportedOutputFormats: ["txt"    "csv"    "xlsx"    "xls"    "parquet"    "parq"]
        DefaultOutputFormat: "txt"

之前描述的多个选项在 mapreduce 的上下文中非常有用。mapreduce 函数对数据存储执行 read,以检索要传递到 map 函数的数据。因此,您可以使用 SelectedVariableNamesSelectedFormatsReadSize 选项来直接配置 mapreduce 传递给 map 函数的数据块大小和数据类型。

例如,要选择 Distance(总航程)变量作为唯一要关注的变量,请指定 SelectedVariableNames

ds.SelectedVariableNames = 'Distance';

现在,不论何时对 ds 执行 readreadallpreview 函数,它们都仅返回 Distance 变量的信息。要确认这一点,您可以预览数据存储中的前几行数据。这样,您可以检查 mapreduce 函数将要传递给 map 函数的数据的格式。

preview(ds)
ans =

  8×1 table

    Distance
    ________

      308   
      296   
      480   
      296   
      373   
      308   
      447   
      954 

要查看 mapreduce 将要传递给 map 函数的确切数据,请使用 read

有关可用选项的更多信息和完整摘要,请参阅数据存储

写入 map 函数和 reduce 函数

mapreduce 函数会在执行期间自动调用 map 函数和 reduce 函数,因此这些函数必须满足特定的要求才能正确运行。

  1. map 函数的输入包括 datainfointermKVStore

    • datainfo 是对输入数据存储调用 read 函数的结果,mapreduce 在每次调用 map 函数之前都会自动执行该函数。

    • intermKVStoreKeyValueStore 中间对象的名称,map 函数需要使用该名称添加键-值对组。addaddmulti 函数使用此对象名称添加键-值对组。如果对 map 函数的所有调用都没有向 intermKVStore 中添加键-值对组,则 mapreduce 不会调用 reduce 函数,并且结果数据存储为空。

    下面是一个简单的 map 函数示例:

    function MeanDistMapFun(data, info, intermKVStore)
        distances = data.Distance(~isnan(data.Distance));
        sumLenValue = [sum(distances)  length(distances)];
        add(intermKVStore, 'sumAndLength', sumLenValue);
    end

    此 map 函数只有三行,分别执行了一些简单的操作。第一行过滤出了航程数据块中的所有 NaN 值。第二行使用块的总航程和计数创建了一个二元素向量,第三行将该值向量添加到键为 'sumAndLength'intermKVStore。对 ds 中的所有数据块运行此 map 函数后,intermKVStore 对象将包含各个航程数据块的总航程和计数。

    在您的当前文件夹中将此函数另存为 MeanDistMapFun.m

  2. reduce 函数的输入包括 intermKeyintermValIteroutKVStore

    • intermKey 用于 map 函数所添加的活动键。mapreduce 每次调用 reduce 函数都会根据中间 KeyValueStore 对象中的键指定新的唯一键。

    • intermValIter 是与活动键 intermKey 相关的 ValueIterator。这个 ValueIterator 对象包含与活动键相关的所有值。使用 hasnextgetnext 函数滚动这些值。

    • outKVStore 是最终 KeyValueStore 对象的名称,reduce 函数需要使用该名称添加键-值对组。mapreduceoutKVStore 中获取输出键-值对组并在输出数据存储中返回它们,默认情况下是一个 KeyValueDatastore 对象。如果对 reduce 函数的所有调用都没有向 outKVStore 中添加键-值对组,则 mapreduce 将返回空的数据存储。

    下面是一个简单的 reduce 函数示例:

    function MeanDistReduceFun(intermKey, intermValIter, outKVStore)
        sumLen = [0 0];
        while hasnext(intermValIter)
            sumLen = sumLen + getnext(intermValIter);
        end
        add(outKVStore, 'Mean', sumLen(1)/sumLen(2));
    end

    此 reduce 函数会遍历 intermValIter 中每组航程和计数的值,并在每次执行后保留航程和计数的实时总和。完成此循环后,reduce 函数使用简单的除法计算出总平均航程,然后向 outKVStore 中添加一个键。

    在您的当前文件夹中将此函数另存为 MeanDistReduceFun.m

有关编写更高级的 map 函数和 reduce 函数的信息,请参阅编写 map 函数Write a Reduce Function

运行 mapreduce

有了数据存储、map 函数和 reduce 函数之后,您便可以调用 mapreduce 来执行计算。要计算数据集中的平均航程,请使用 dsMeanDistMapFunMeanDistReduceFun 来调用 mapreduce

outds = mapreduce(ds, @MeanDistMapFun, @MeanDistReduceFun);
********************************
*      MAPREDUCE PROGRESS      *
********************************
Map   0% Reduce   0%
Map  16% Reduce   0%
Map  32% Reduce   0%
Map  48% Reduce   0%
Map  65% Reduce   0%
Map  81% Reduce   0%
Map  97% Reduce   0%
Map 100% Reduce   0%
Map 100% Reduce 100%

默认情况下,mapreduce 函数会在命令行中显示进度信息,并返回一个指向当前文件夹中的文件的 KeyValueDatastore 对象。您可以使用 Name,Value 对组参量来调整 'OutputFolder''OutputType''Display' 这三个选项。有关详细信息,请参阅 mapreduce 的参考页。

查看结果

使用 readall 函数从输出数据存储读取键-值对组。

readall(outds)
ans =

  1×2 table

      Key          Value    
    ________    ____________

    {'Mean'}    {[702.1630]}

另请参阅

|

相关主题