質問
Yelp の MapReduce 用 Python API、MRJob の使い方を学ぼうとしています。彼らの単純な単語カウンターの例は理にかなっていますが、複数の入力を含むアプリケーションをどのように処理するか興味があります。たとえば、単に文書内の単語を数えるのではなく、ベクトルと行列を乗算します。私はこの解決策を思いつきました。これは機能しますが、愚かに思えます。
class MatrixVectMultiplyTast(MRJob):
def multiply(self,key,line):
line = map(float,line.split(" "))
v,col = line[-1],line[:-1]
for i in xrange(len(col)):
yield i,col[i]*v
def sum(self,i,occurrences):
yield i,sum(occurrences)
def steps(self):
return [self.mr (self.multiply,self.sum),]
if __name__=="__main__":
MatrixVectMultiplyTast.run()
このコードが実行されると ./matrix.py < input.txt
これが機能する理由は、行列が列ごとに input.txt に保存されており、行の末尾に対応するベクトル値があるためです。
したがって、次の行列とベクトルは次のようになります。
は、input.txt として次のように表されます。
つまり、より自然に行列とベクトルを別々のファイルに保存し、両方を MRJob に渡すにはどうすればよいでしょうか?
解決
生データを別の(または同じrow_i、row_j)データセットに対して処理する必要がある場合は、次のいずれかができます。
1)S3バケットを作成して、データのコピーを保存します。このコピーの場所をタスククラスに渡します。警告:残念ながら、処理する前にファイル全体がタスクマシンに「ダウンロード」される必要があるようです。接続が衰退するか、ロードするのに時間がかかりすぎると、このジョブが失敗する可能性があります。これを行うためのPython/Mrjobコードを次に示します。
これをマッパー関数に入れてください:
d1 = line1.split('\t', 1)
v1, col1 = d1[0], d1[1]
conn = boto.connect_s3(aws_access_key_id=<AWS_ACCESS_KEY_ID>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>)
bucket = conn.get_bucket(self.options.bucket) # bucket = conn.get_bucket(MY_UNIQUE_BUCKET_NAME_AS_STRING)
data_copy = bucket.get_key(self.options.my_datafile_copy_location).get_contents_as_string().rstrip()
### CAVEAT: Needs to get the whole file before processing the rest.
for line2 in data_copy.split('\n'):
d2 = line2.split('\t', 1)
v2, col2 = d2[0], d2[1]
## Now, insert code to do any operations between v1 and v2 (or c1 and c2) here:
yield <your output key, value pairs>
conn.close()
2)SimpleDBドメインを作成し、そこにすべてのデータを保存します。 BotoとSimpledbでここを読んでください:http://code.google.com/p/boto/wiki/simpledbintro
あなたのマッパーコードは次のようになります:
dline = dline.strip()
d0 = dline.split('\t', 1)
v1, c1 = d0[0], d0[1]
sdb = boto.connect_sdb(aws_access_key_id=<AWS_ACCESS_KEY>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>)
domain = sdb.get_domain(MY_DOMAIN_STRING_NAME)
for item in domain:
v2, c2 = item.name, item['column']
## Now, insert code to do any operations between v1 and v2 (or c1 and c2) here:
yield <your output key, value pairs>
sdb.close()
この2番目のオプションは、非常に大量のデータがある場合は、完全な量ではなくデータの各行のリクエストを作成できるため、パフォーマンスが向上します。 SimpleDB値は最大1024文字しか長さでもないため、データ値がそれよりも長い場合は、何らかの方法で圧縮/減圧する必要がある場合があることに注意してください。
他のヒント
あなたの質問に対する実際の答えは、mrjob が Hadoop ストリーミング結合パターンをまだサポートしていないということです。これは、map_input_file 環境変数 (map.input.file プロパティを公開する) を読み取り、どのタイプのファイルに基づいて処理しているかを判断することです。そのパスや名前に。
この記事に示されているように、データ自体を読み取るだけでデータがどのタイプに属しているかを簡単に検出できる場合は、まだ成功する可能性があります。
ただし、それが常に可能なわけではありません...
それ以外の場合、私のジョブは素晴らしく、将来的にはこれに対するサポートを追加できることを願っています。それまでは、これは私にとってかなり大きな問題です。
これは、複数の入力を使用する方法であり、ファイル名に基づいてマッパーフェーズで適切な変更を加えます。
ランナープログラム:
from mrjob.hadoop import *
#Define all arguments
os.environ['HADOOP_HOME'] = '/opt/cloudera/parcels/CDH/lib/hadoop/'
print "HADOOP HOME is now set to : %s" % (str(os.environ.get('HADOOP_HOME')))
job_running_time = datetime.datetime.now().strftime('%Y-%m-%d_%H_%M_%S')
hadoop_bin = '/usr/bin/hadoop'
mode = 'hadoop'
hs = HadoopFilesystem([hadoop_bin])
input_file_names = ["hdfs:///app/input_file1/","hdfs:///app/input_file2/"]
aargs = ['-r',mode,'--jobconf','mapred.job.name=JobName','--jobconf','mapred.reduce.tasks=3','--no-output','--hadoop-bin',hadoop_bin]
aargs.extend(input_file_names)
aargs.extend(['-o',output_dir])
print aargs
status_file = True
mr_job = MRJob(args=aargs)
with mr_job.make_runner() as runner:
runner.run()
os.environ['HADOOP_HOME'] = ''
print "HADOOP HOME is now set to : %s" % (str(os.environ.get('HADOOP_HOME')))
Mrjobクラス:
class MR_Job(MRJob):
DEFAULT_OUTPUT_PROTOCOL = 'repr_value'
def mapper(self, _, line):
"""
This function reads lines from file.
"""
try:
#Need to clean email.
input_file_name = get_jobconf_value('map.input.file').split('/')[-2]
"""
Mapper code
"""
except Exception, e:
print e
def reducer(self, email_id,visitor_id__date_time):
try:
"""
Reducer Code
"""
except:
pass
if __name__ == '__main__':
MRV_Email.run()
私の理解では、例としてローカルファイルで実行されている場合でも、AmazonのHadoopクラスターまたはHadoopサービスを活用したくない限り、Mrjobを使用しません。
主要なMrjobは使用しています」Hadoopストリーミング「仕事を提出する。
これは、Hadoopからファイルまたはフォルダーとして指定されたすべての入力がマッパーにストリーミングされ、その後の結果がレデューサーにストリーミングされることを意味します。すべてのマッパーは入力のスライスを取得し、すべての入力が概略的に同じであると見なされるため、各データスライスのキー、値を均一に解析および処理します。
この理解から派生して、入力はマッパーと概略的に同じです。 2つの異なる概略データを含めることができる唯一の方法は、マッパーがどのベクトルデータであり、どのマトリックスデータであるかを理解できるように、同じファイルでそれらをインターリーブすることです。
You are actually doing it already.
行がマトリックスデータまたはベクトルデータである場合、いくつかの指定子を持つことで、それを単純に改善できます。ベクトルデータが表示されると、前のマトリックスデータが適用されます。
matrix, 1, 2, ...
matrix, 2, 4, ...
vector, 3, 4, ...
matrix, 1, 2, ...
.....
しかし、あなたが言及したプロセスはうまく機能します。単一のファイルにすべての概略データを使用する必要があります。
しかし、これにはまだ問題があります。 K、Vマップ削減は、完全なスキーマが単一の行に存在し、完全な単一の処理ユニットが含まれている場合、より良く動作します。
私の理解から、あなたはすでにそれを正しく行っていますが、Map-Reduceはこの種のデータに適したメカニズムではないと思います。誰かがこれを私ができるよりもさらに明確にすることを願っています。