Main Content

编写 map 函数

map 函数在 MapReduce 中的角色

mapreduce 既需要接收数据块并输出中间结果的输入 map 函数,也需要读取中间结果并生成最终结果的输入 reduce 函数。因此,通常将计算分解成两个相关部分,分别由 map 和 reduce 函数来完成。例如,要求数据集中的最大值,可以使用 map 函数求出每个输入数据块中的最大值,然后使用 reduce 函数求出所有中间最大值中的单个最大值。

下图显示 mapreduce 算法的映射阶段。

Illustration of Map phase of MapReduce algorithm: input datastore, map phase, and intermediate sorting phase.

mapreduce 算法的映射阶段包含以下步骤:

  1. mapreduce 使用输入数据存储上的 read 函数读取单个数据块,然后调用 map 函数对该数据块进行处理。

  2. 然后,map 函数对单个数据块进行处理,并使用 addaddmulti 函数向中间 KeyValueStore 对象添加一个或多个键-值对组。

  3. mapreduce 对输入数据存储中的每个数据块重复此过程,因此对 map 函数的调用总数等于数据块数。数据存储的 ReadSize 属性决定数据块数。

当 map 函数处理输入数据存储中的每个数据块时,mapreduce 算法的映射阶段完成。mapreduce 算法的此阶段的结果是一个 KeyValueStore 对象,它包含 map 函数添加的所有键-值对组。在映射阶段后,mapreduce 通过按唯一键对 KeyValueStore 对象中的所有值进行分组,为归约阶段做准备。

map 函数的要求

mapreduce 自动为输入数据存储中的每个数据块调用 map 函数。map 函数必须满足某些基本要求,才能在这些自动调用期间正常运行。这些要求共同确保数据正确通过 mapreduce 算法的映射阶段。

map 函数的输入包括 datainfointermKVStore

  • datainfo 是对输入数据存储调用 read 函数的结果,mapreduce 在每次调用 map 函数之前都会自动执行该函数。

  • intermKVStoreKeyValueStore 中间对象的名称,map 函数需要使用该名称添加键-值对组。addaddmulti 函数使用此对象名称添加键-值对组。如果 map 函数没有向 intermKVStore 对象添加任何键-值对组,则 mapreduce 不会调用 reduce 函数,并且得到的结果数据存储为空。

除了 map 函数的这些基本要求之外,map 函数添加的键-值对组还必须满足以下条件:

  1. 键必须为数值标量、字符向量或字符串。数值键不能为 NaN、复数、逻辑值或稀疏矩阵。

  2. 由 map 函数添加的所有键必须具有相同的类。

  3. 值可以是任何 MATLAB® 对象,包括所有有效的 MATLAB 数据类型。

注意

在使用包含 mapreduce 的其他产品时,上述键-值对组要求可能不同。请参阅相应产品的文档以获得产品特定的键-值对组要求。

map 函数示例

以下是在 mapreduce 示例中使用的一些 map 函数示例。

恒等映射函数

如果 map 函数只是简单地返回 mapreduce 传递给它的内容,则该 map 函数称为恒等映射函数。在 reduce 函数中进行计算之前,可以利用恒等映射函数按唯一键对值进行分组。identityMapper 映射函数文件是示例Tall Skinny QR (TSQR) Matrix Factorization Using MapReduce中使用的映射函数之一。

function identityMapper(data, info, intermKVStore)
  % This mapper function simply copies the data and add them to the
  % intermKVStore as intermediate values.
  x = data.Value{:,:};
  add(intermKVStore,'Identity', x);
end

简单 map 函数

非恒等映射函数的一个最简单示例是 maxArrivalDelayMapper,这是示例Find Maximum Value with MapReduce的映射函数。对于输入数据的每个块,此映射函数都会计算最大到港延误,并向中间 KeyValueStore 添加一个键-值对组。

function maxArrivalDelayMapper (data, info, intermKVStore)
  partMax = max(data.ArrDelay);
  add(intermKVStore, 'PartialMaxArrivalDelay',partMax);
end

高级 map 函数

statsByGroupMapper 是一个更高级的映射函数示例,它是示例Compute Summary Statistics by Group Using MapReduce的映射函数。此映射函数使用嵌套函数计算每个输入数据块的几个统计量(计数、均值、方差等),然后向中间 KeyValueStore 对象添加几个键-值对组。此外,此映射函数使用四个输入参量,而 mapreduce 只接受具有三个输入参量的 map 函数。为了解决此问题,请在调用 mapreduce 的过程中使用匿名函数传入一个额外参数,如示例所示。

function statsByGroupMapper(data, ~, intermKVStore, groupVarName)
  % Data is a n-by-3 table. Remove missing values first
  delays = data.ArrDelay;
  groups = data.(groupVarName);
  notNaN =~isnan(delays);
  groups = groups(notNaN);
  delays = delays(notNaN);
  % Find the unique group levels in this chunk
  [intermKeys,~,idx] = unique(groups, 'stable');
  % Group delays by idx and apply @grpstatsfun function to each group
  intermVals = accumarray(idx,delays,size(intermKeys),@grpstatsfun);
  addmulti(intermKVStore,intermKeys,intermVals);
  function out = grpstatsfun(x)
    n = length(x); % count
    m = sum(x)/n; % mean
    v = sum((x-m).^2)/n; % variance
    s = sum((x-m).^3)/n; % skewness without normalization
    k = sum((x-m).^4)/n; % kurtosis without normalization
    out = {[n, m, v, s, k]};
  end
end

另请参阅

| | | |

相关主题