Question

Je suis en train d'apprendre à utiliser l'API Python Yelp pour MapReduce, MRJob. Leur exemple simple mot contre sens, mais je suis curieux de voir comment on pourrait gérer une application impliquant plusieurs entrées. Par exemple, plutôt que de compter simplement les mots dans un document, la multiplication d'un vecteur par une matrice. Je suis venu avec cette solution, qui fonctionne, mais se sent stupide:

class MatrixVectMultiplyTast(MRJob):
    def multiply(self,key,line):
            line = map(float,line.split(" "))
            v,col = line[-1],line[:-1]

            for i in xrange(len(col)):
                    yield i,col[i]*v

    def sum(self,i,occurrences):
            yield i,sum(occurrences)

    def steps(self):
            return [self.mr (self.multiply,self.sum),]

if __name__=="__main__":
    MatrixVectMultiplyTast.run()

Ce code est exécuté ./matrix.py < input.txt et la raison pour laquelle il fonctionne est que la matrice stockée dans input.txt par des colonnes, la valeur de vecteur correspondant à la fin de la ligne.

Ainsi, la matrice et le vecteur suivant:

entrer image description ici

sont représentés comme input.txt comme:

entrer image description ici

En bref, comment pourrais-je aller sur le stockage de la matrice et vecteur plus naturellement dans des fichiers séparés et les passant à la fois dans MRJob?

Était-ce utile?

La solution

Si vous avez besoin de traiter vos données brutes contre un autre (ou même row_i, row_j) jeu de données, vous pouvez:

1) Créer un seau S3 pour stocker une copie de vos données. Passez l'emplacement de cette copie à votre classe de tâches, par exemple self.options.bucket et self.options.my_datafile_copy_location dans le code ci-dessous. Avertissement: Malheureusement, il semble que le fichier entier doit obtenir « téléchargé » aux machines de travail avant d'être traitées. Si les connexions vacille ou prend trop de temps à la charge, ce travail peut échouer. Voici un code Python / MRJob pour le faire.

Mettez dans votre fonction Mapper:

d1 = line1.split('\t', 1)
v1, col1 = d1[0], d1[1]
conn = boto.connect_s3(aws_access_key_id=<AWS_ACCESS_KEY_ID>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>)
bucket = conn.get_bucket(self.options.bucket)  # bucket = conn.get_bucket(MY_UNIQUE_BUCKET_NAME_AS_STRING)
data_copy = bucket.get_key(self.options.my_datafile_copy_location).get_contents_as_string().rstrip()
### CAVEAT: Needs to get the whole file before processing the rest.
for line2 in data_copy.split('\n'):
    d2 = line2.split('\t', 1)
    v2, col2 = d2[0], d2[1]
    ## Now, insert code to do any operations between v1 and v2 (or c1 and c2) here:
    yield <your output key, value pairs>
conn.close()

2) Créer un domaine SimpleDB et stocker toutes vos données là-dedans. Lisez ici sur boto et SimpleDB: http://code.google.com/p/boto/wiki/SimpleDbIntro

Votre code mappeur ressemblerait à ceci:

dline = dline.strip()
d0 = dline.split('\t', 1)
v1, c1 = d0[0], d0[1]
sdb = boto.connect_sdb(aws_access_key_id=<AWS_ACCESS_KEY>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>)
domain = sdb.get_domain(MY_DOMAIN_STRING_NAME)
for item in domain:
    v2, c2 = item.name, item['column']
    ## Now, insert code to do any operations between v1 and v2 (or c1 and c2) here:
    yield <your output key, value pairs>
sdb.close()

Cette seconde option peut effectuer mieux si vous avez de très grandes quantités de données, car il peut faire les demandes de chaque ligne de données plutôt que le montant total à la fois. Gardez à l'esprit que les valeurs SimpleDB ne peut être un maximum de 1024 caractères, de sorte que vous devrez peut-être compresser / décompresser par une méthode si vos valeurs de données sont plus que cela.

Autres conseils

La réponse réelle à votre question est que mrjob ne supporte pas encore tout à fait le Hadoop en continu rejoindre modèle, qui est de lire la variable d'environnement map_input_file (qui expose la propriété map.input.file) pour déterminer quel type de fichier que vous êtes traiter en fonction de son chemin et / ou le nom.

Vous pourriez encore être en mesure de le retirer, si vous pouvez facilement détecter de simplement lire les données elles-mêmes quel type il appartient, comme il est affiché dans cet article:

http : //allthingshadoop.com/2011/12/16/simple-hadoop-streaming-tutorial-using-joins-and-keys-with-python/

Mais ce ne est pas toujours possible ...

Sinon myjob a l'air fantastique et je souhaite pouvoir ajouter le support pour cela à l'avenir. Jusque-là c'est à peu près un facteur de rupture pour moi.

Voici comment j'utilise des entrées multiples et en fonction de nom de fichier apporter des changements appropriés dans la phase de cartographe.

Programme Runner:

from mrjob.hadoop import *


#Define all arguments

os.environ['HADOOP_HOME'] = '/opt/cloudera/parcels/CDH/lib/hadoop/'
print "HADOOP HOME is now set to : %s" % (str(os.environ.get('HADOOP_HOME')))
job_running_time = datetime.datetime.now().strftime('%Y-%m-%d_%H_%M_%S')
hadoop_bin = '/usr/bin/hadoop'
mode = 'hadoop'
hs = HadoopFilesystem([hadoop_bin])

input_file_names = ["hdfs:///app/input_file1/","hdfs:///app/input_file2/"]

aargs = ['-r',mode,'--jobconf','mapred.job.name=JobName','--jobconf','mapred.reduce.tasks=3','--no-output','--hadoop-bin',hadoop_bin]
aargs.extend(input_file_names)
aargs.extend(['-o',output_dir])
print aargs
status_file = True

mr_job = MRJob(args=aargs)
with mr_job.make_runner() as runner:
    runner.run()
os.environ['HADOOP_HOME'] = ''
print "HADOOP HOME is now set to : %s" % (str(os.environ.get('HADOOP_HOME')))

La MRJob Classe:

class MR_Job(MRJob):
    DEFAULT_OUTPUT_PROTOCOL = 'repr_value'
    def mapper(self, _, line):
    """
    This function reads lines from file.
    """
    try:
        #Need to clean email.
        input_file_name = get_jobconf_value('map.input.file').split('/')[-2]
                """
                Mapper code
                """
    except Exception, e:
        print e

    def reducer(self, email_id,visitor_id__date_time):
    try:
        """
                Reducer Code
                """
    except:
        pass


if __name__ == '__main__':
    MRV_Email.run()

Dans ma compréhension, vous ne comptez pas utiliser MrJob à moins que vous vouliez levier ou groupe Hadoop services Hadoop d'Amazon, même si l'exemple utilise en cours d'exécution sur les fichiers locaux.

MrJob dans les principales utilisations " Hadoop le streaming " pour soumettre le travail.

Cela signifie que toutes les entrées spécifiées sous forme de fichiers ou de dossiers de Hadoop est transmis en continu à mappeur et les résultats subséquents à réducteur. Tous mappeur obtient une tranche d'entrée et considère toutes les entrées être schématiquement la même manière que la clé uniforme de processus et Parsis, la valeur pour chaque tranche de données.

Découlant de cette compréhension, les entrées sont schématiquement les mêmes à la Mapper. La seule façon possible d'inclure deux données différentes schématique est de les imbriquer dans le même fichier de telle sorte que le mappeur peut comprendre ce qui est des données vectorielles et qui sont des données de la matrice.

You are actually doing it already.

Vous pouvez simplement améliorer cela en ayant une spécificateur si une ligne est des données de la matrice ou un vecteur de données. Une fois que vous voyez une des données vectorielles alors les données de la matrice précédente est appliquée.

matrix, 1, 2, ...
matrix, 2, 4, ...
vector, 3, 4, ...
matrix, 1, 2, ...
.....

Mais le processus que vous avez mentionné fonctionne bien. Vous devez avoir toutes les données schématiques dans un seul fichier.

Cela a encore des problèmes bien. K, la carte de V réduire fonctionne mieux quand schéma complet est présent dans une seule ligne et contient une seule unité de traitement complet.

Si je comprends bien, vous faites déjà correctement, mais je suppose que la carte-Reduce est pas un mécanisme approprié pour ce type de données. J'espère que certains un clarifie encore plus loin que moi.

Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top