Main Content

matlab.io.datastore.HadoopFileBased 类

命名空间: matlab.io.datastore

(不推荐)为数据存储添加 Hadoop 文件支持

不推荐使用 matlab.io.datastore.HadoopFileBased。请改用 matlab.io.datastore.HadoopLocationBased

描述

matlab.io.datastore.HadoopFileBased 是一个抽象的 mixin 类,可为您的自定义数据存储添加 Hadoop® 支持。

要使用此 mixin 类,除了从 matlab.io.Datastore 基类继承之外,还必须从 matlab.io.datastore.HadoopFileBased 类继承。键入以下语法作为类定义文件的第一行:

classdef MyDatastore < matlab.io.Datastore & ...
                             matlab.io.datastore.HadoopFileBased 
    ...
end

要添加 Hadoop 支持和并行处理支持,请在您的类定义文件中使用以下行:

classdef MyDatastore < matlab.io.Datastore & ...
                             matlab.io.datastore.Partitionable & ...
                             matlab.io.datastore.HadoopFileBased 
    ...
end

要为自定义数据存储添加 Hadoop 支持,您还必须:

有关创建支持 Hadoop 的自定义数据存储的详细信息和步骤,请参阅Develop Custom Datastore

方法

getLocation(不推荐)Hadoop 中文件的位置
initializeDatastore(不推荐)使用 Hadoop 中的信息初始化数据存储
isfullfile(不推荐)检查数据存储是否读取完整文件

属性

Sealedfalse

有关类属性的信息,请参阅类属性

示例

全部折叠

实现一个支持并行处理和 Hadoop 的数据存储,并使用它将您的数据从 Hadoop 服务器导入 MATLAB®。然后对这些数据使用 tallgather 函数。

创建一个新的 .m 类定义文件,其中包含实现自定义数据存储的代码。您必须将此文件保存在工作文件夹或 MATLAB 路径上的文件夹中。.m 文件的名称必须与对象构造函数的名称相同。例如,如果您希望构造函数的名称为 MyDatastoreHadoop,则脚本文件的名称必须为 MyDatastoreHadoop.m.m 类定义文件必须包含以下步骤:

  • 步骤 1:从数据存储类继承。

  • 步骤 2:定义构造函数和必需的方法。

  • 步骤 3:定义您的自定义文件读取函数。

以下代码显示了可从 Hadoop 服务器读取二进制文件的自定义数据存储示例实现的三个步骤。

%% STEP 1: INHERIT FROM DATASTORE CLASSES
classdef MyDatastoreHadoop < matlab.io.Datastore & ...
        matlab.io.datastore.Partitionable & ...
        matlab.io.datastore.HadoopFileBased
    
    properties (Access = private)
        CurrentFileIndex double
        FileSet matlab.io.datastore.DsFileSet
    end

         
%% STEP 2: DEFINE THE CONSTRUCTOR AND THE REQUIRED METHODS
    methods
        % Define your datastore constructor
        function myds = MyDatastoreHadoop(location,altRoots)
            myds.FileSet = matlab.io.datastore.DsFileSet(location,...
                'FileExtensions','.bin', ...
                'FileSplitSize',8*1024);
            myds.CurrentFileIndex = 1;
             
            if nargin == 2
                 myds.AlternateFileSystemRoots = altRoots;
            end
            
            reset(myds);
        end
        
        % Define the hasdata method
        function tf = hasdata(myds)
            % Return true if more data is available
            tf = hasfile(myds.FileSet);
        end
        
        % Define the read method
        function [data,info] = read(myds)
            % Read data and information about the extracted data
            % See also: MyFileReader()
            if ~hasdata(myds)
                error(sprintf(['No more data to read.\nUse the reset ',... 
                     'method to reset the datastore to the start of ' ,...
                     'the data. \nBefore calling the read method, ',...
                     'check if data is available to read ',...
                     'by using the hasdata method.'])) 
            end
            
            fileInfoTbl = nextfile(myds.FileSet);
            data = MyFileReader(fileInfoTbl);
            info.Size = size(data);
            info.FileName = fileInfoTbl.FileName;
            info.Offset = fileInfoTbl.Offset;
            
            % Update CurrentFileIndex for tracking progress
            if fileInfoTbl.Offset + fileInfoTbl.SplitSize >= ...
                    fileInfoTbl.FileSize
                myds.CurrentFileIndex = myds.CurrentFileIndex + 1 ;
            end
        end
        
        % Define the reset method
        function reset(myds)
            % Reset to the start of the data
            reset(myds.FileSet);
            myds.CurrentFileIndex = 1;
        end
        
        
        % Define the partition method
        function subds = partition(myds,n,ii)
            subds = copy(myds);
            subds.FileSet = partition(myds.FileSet,n,ii);
            reset(subds);
        end
    end      

     
    methods (Hidden = true)   

        % Define the progress method
        function frac = progress(myds)
            % Determine percentage of data read from datastore
            if hasdata(myds) 
               frac = (myds.CurrentFileIndex-1)/...
                             myds.FileSet.NumFiles; 
            else 
               frac = 1;  
            end 
        end
 
        % Define the initializeDatastore method
        function initializeDatastore(myds,hadoopInfo)
            import matlab.io.datastore.DsFileSet;
            myds.FileSet = DsFileSet(hadoopInfo,...
                'FileSplitSize',myds.FileSet.FileSplitSize,...
                'IncludeSubfolders',true, ...
                'FileExtensions','.bin');
            reset(myds);
        end
        
        % Define the getLocation method
        function loc = getLocation(myds)
            loc = myds.FileSet;
        end
        
        % Define the isfullfile method
        function tf = isfullfile(~)
            tf = isequal(myds.FileSet.FileSplitSize,'file'); 
        end

    end
        
    methods (Access = protected)
        % If you use the  FileSet property in the datastore,
        % then you must define the copyElement method. The
        % copyElement method allows methods such as readall
        % and preview to remain stateless 
        function dscopy = copyElement(ds)
            dscopy = copyElement@matlab.mixin.Copyable(ds);
            dscopy.FileSet = copy(ds.FileSet);
        end
        
        % Define the maxpartitions method
        function n = maxpartitions(myds)
            n = maxpartitions(myds.FileSet);
        end
    end
end

%% STEP 3: IMPLEMENT YOUR CUSTOM FILE READING FUNCTION
function data = MyFileReader(fileInfoTbl)
% create a reader object using FileName
reader = matlab.io.datastore.DsFileReader(fileInfoTbl.FileName);

% seek to the offset
seek(reader,fileInfoTbl.Offset,'Origin','start-of-file');

% read fileInfoTbl.SplitSize amount of data
data = read(reader,fileInfoTbl.SplitSize);
end

此步骤完成您的自定义数据存储的实现。

接下来,使用自定义数据存储构造函数创建数据存储对象。如果您的数据位于 hdfs:///path_to_files,则可以使用以下代码。

setenv('HADOOP_HOME','/path/to/hadoop/install');
ds = MyDatastoreHadoop('hdfs:///path_to_files');

要对具有并行集群配置的 Apache® Spark™ 使用 tall 数组和 gather 函数,请设置 mapreducer 并将 MyDatastoreHadoop.m 与集群关联。

mr = mapreducer(cluster);
mr.Cluster.AttachedFiles = 'MyDatastoreHadoop.m';

基于数据存储创建 tall 数组。

t = tall(ds);

获取 tall 数组的头部。

 hd = gather(head(t));

版本历史记录

在 R2017b 中推出