我正在尝试学习将Yelp的Python API用于MapReduce,Mrjob。他们的简单词反示例很有意义,但是我很好奇一个人如何处理涉及多个输入的应用程序。例如,而不是简单地计算文档中的单词,而是将向量乘以矩阵。我想出了这个解决方案,它的功能很愚蠢:

class MatrixVectMultiplyTast(MRJob):
    def multiply(self,key,line):
            line = map(float,line.split(" "))
            v,col = line[-1],line[:-1]

            for i in xrange(len(col)):
                    yield i,col[i]*v

    def sum(self,i,occurrences):
            yield i,sum(occurrences)

    def steps(self):
            return [self.mr (self.multiply,self.sum),]

if __name__=="__main__":
    MatrixVectMultiplyTast.run()

此代码运行 ./matrix.py < input.txt 其工作原因是矩阵逐列存储在input.txt中,并在行末尾具有相应的向量值。

因此,以下矩阵和向量:

enter image description here

表示为input.txt为:

enter image description here

简而言之,我将如何自然地将矩阵和矢量存储在单独的文件中并将它们都传递到mrjob中?

有帮助吗?

解决方案

如果您需要针对另一个(或同一ROW_I,ROW_J)数据集处理原始数据,则可以:

1)创建一个S3存储桶以存储数据的副本。将此副本的位置传递给您的任务类,例如self.options.bucket and self.options.my_datafile_copy_location,在下面的代码中。警告:不幸的是,似乎整个文件必须在处理之前“下载”到任务机。如果连接步履蹒跚或加载太长,则此作业可能会失败。这是一些Python/MRJOB代码。

将其放入您的映射器函数中:

d1 = line1.split('\t', 1)
v1, col1 = d1[0], d1[1]
conn = boto.connect_s3(aws_access_key_id=<AWS_ACCESS_KEY_ID>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>)
bucket = conn.get_bucket(self.options.bucket)  # bucket = conn.get_bucket(MY_UNIQUE_BUCKET_NAME_AS_STRING)
data_copy = bucket.get_key(self.options.my_datafile_copy_location).get_contents_as_string().rstrip()
### CAVEAT: Needs to get the whole file before processing the rest.
for line2 in data_copy.split('\n'):
    d2 = line2.split('\t', 1)
    v2, col2 = d2[0], d2[1]
    ## Now, insert code to do any operations between v1 and v2 (or c1 and c2) here:
    yield <your output key, value pairs>
conn.close()

2)创建一个简单的域,然后将所有数据存储在其中。在Boto和SimpledB上阅读此处:http://code.google.com/p/boto/wiki/simpledbintro

您的映射器代码看起来像这样:

dline = dline.strip()
d0 = dline.split('\t', 1)
v1, c1 = d0[0], d0[1]
sdb = boto.connect_sdb(aws_access_key_id=<AWS_ACCESS_KEY>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>)
domain = sdb.get_domain(MY_DOMAIN_STRING_NAME)
for item in domain:
    v2, c2 = item.name, item['column']
    ## Now, insert code to do any operations between v1 and v2 (or c1 and c2) here:
    yield <your output key, value pairs>
sdb.close()

如果您拥有大量数据,则第二个选项可能会表现更好,因为它可以对每行数据发出请求,而不是一次全部。请记住,SimpleDB值最多只能长1024个字符,因此,如果您的数据值长,则可能需要通过某些方法来压缩/解压缩。

其他提示

您问题的实际答案是,MRJOB尚未完全支持Hadoop流连接模式,即读取MAP_INPUT_FILE环境变量(揭示Map.input.file属性)以确定您要处理的文件类型在其路径和/或名称上。

如果您可以轻松地从读取数据本身属于哪种类型,那么您仍然可以实现它,如本文所示:

http://allthingshadoop.com/2011/12/16/simple-hadoop-streaming-tutorial-using-using-joins-joins-and-keys-with-python/

但是,这并不总是可能...

否则,Myjob看起来很棒,我希望他们将来可以为此增加支持。在那之前,这对我来说几乎是一个破坏者。

这就是我使用多个输入的方式,并基于文件名在映射阶段进行适当的更改。

跑步者计划:

from mrjob.hadoop import *


#Define all arguments

os.environ['HADOOP_HOME'] = '/opt/cloudera/parcels/CDH/lib/hadoop/'
print "HADOOP HOME is now set to : %s" % (str(os.environ.get('HADOOP_HOME')))
job_running_time = datetime.datetime.now().strftime('%Y-%m-%d_%H_%M_%S')
hadoop_bin = '/usr/bin/hadoop'
mode = 'hadoop'
hs = HadoopFilesystem([hadoop_bin])

input_file_names = ["hdfs:///app/input_file1/","hdfs:///app/input_file2/"]

aargs = ['-r',mode,'--jobconf','mapred.job.name=JobName','--jobconf','mapred.reduce.tasks=3','--no-output','--hadoop-bin',hadoop_bin]
aargs.extend(input_file_names)
aargs.extend(['-o',output_dir])
print aargs
status_file = True

mr_job = MRJob(args=aargs)
with mr_job.make_runner() as runner:
    runner.run()
os.environ['HADOOP_HOME'] = ''
print "HADOOP HOME is now set to : %s" % (str(os.environ.get('HADOOP_HOME')))

MRJOB课:

class MR_Job(MRJob):
    DEFAULT_OUTPUT_PROTOCOL = 'repr_value'
    def mapper(self, _, line):
    """
    This function reads lines from file.
    """
    try:
        #Need to clean email.
        input_file_name = get_jobconf_value('map.input.file').split('/')[-2]
                """
                Mapper code
                """
    except Exception, e:
        print e

    def reducer(self, email_id,visitor_id__date_time):
    try:
        """
                Reducer Code
                """
    except:
        pass


if __name__ == '__main__':
    MRV_Email.run()

就我的理解,即使您想利用亚马逊的Hadoop群集或Hadoop服务,即使该示例利用该示例在本地文件上运行,否则您不会使用MRJOB。

主要用途中的mrjobhadoop流“提交工作。

这意味着将所有指定为Hadoop的文件或文件夹指定的输入都流到了映射器,并将结果流到了还原器。所有映射器均获得一片输入,并认为所有输入在示意性上都是相同的,因此它均匀地解析和处理密钥,每个数据切片的值。

根据这种理解,输入在示意性地与映射器相同。包括两个不同的示意图数据的唯一方法是以同一文件的方式将它们交织在一起,以使映射器可以理解哪个是向量数据以及哪个是矩阵数据。

You are actually doing it already.

您可以简单地通过拥有一些指定符,如果线路是矩阵数据或向量数据,则可以改进它。一旦看到向量数据,就将上一个矩阵数据应用于它。

matrix, 1, 2, ...
matrix, 2, 4, ...
vector, 3, 4, ...
matrix, 1, 2, ...
.....

但是您提到的过程效果很好。您必须将所有示意图数据放在一个文件中。

不过,这仍然存在问题。当完整的架构在单行中存在并包含一个完整的单个处理单元时,K,V映射减少的工作功能更好。

从我的理解来看,您已经正确地做到了,但是我想MAP-REDUCE不是这种数据的合适机制。我希望有人能比我进一步阐明这一点。

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top