Pregunta

Estoy tratando de aprender a usar la API Python de Yelp para MapReduce, Mrjob. Su simple ejemplo de contador de palabras tiene sentido, pero tengo curiosidad por saber cómo se manejaría una aplicación que involucra múltiples entradas. Por ejemplo, en lugar de simplemente contar las palabras en un documento, multiplicando un vector por una matriz. Se me ocurrió esta solución, que funciona, pero se siente tonta:

class MatrixVectMultiplyTast(MRJob):
    def multiply(self,key,line):
            line = map(float,line.split(" "))
            v,col = line[-1],line[:-1]

            for i in xrange(len(col)):
                    yield i,col[i]*v

    def sum(self,i,occurrences):
            yield i,sum(occurrences)

    def steps(self):
            return [self.mr (self.multiply,self.sum),]

if __name__=="__main__":
    MatrixVectMultiplyTast.run()

Este código se ejecuta ./matrix.py < input.txt y la razón por la que funciona es que la matriz almacenada en input.txt por columnas, con el valor del vector correspondiente al final de la línea.

Entonces, la siguiente matriz y vector:

enter image description here

se representan como entrada.txt como:

enter image description here

En resumen, ¿cómo podría almacenar la matriz y el vector más naturalmente en archivos separados y pasarlos a ambos a Mrjob?

¿Fue útil?

Solución

Si necesita procesar sus datos sin procesar en otro conjunto de datos (o mismo Row_i, Row_J), puede:

1) Cree un cubo S3 para almacenar una copia de sus datos. Pase la ubicación de esta copia a su clase de tareas, por ejemplo, self.options.bucket y self.options.my_datafile_copy_location en el código a continuación. Advertencia: Desafortunadamente, parece que todo el archivo debe "descargarse" a las máquinas de tareas antes de procesarse. Si las conexiones vacilan o tarda demasiado en cargarse, este trabajo puede fallar. Aquí hay algún código Python/MrJob para hacer esto.

Pon esto en tu función mapper:

d1 = line1.split('\t', 1)
v1, col1 = d1[0], d1[1]
conn = boto.connect_s3(aws_access_key_id=<AWS_ACCESS_KEY_ID>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>)
bucket = conn.get_bucket(self.options.bucket)  # bucket = conn.get_bucket(MY_UNIQUE_BUCKET_NAME_AS_STRING)
data_copy = bucket.get_key(self.options.my_datafile_copy_location).get_contents_as_string().rstrip()
### CAVEAT: Needs to get the whole file before processing the rest.
for line2 in data_copy.split('\n'):
    d2 = line2.split('\t', 1)
    v2, col2 = d2[0], d2[1]
    ## Now, insert code to do any operations between v1 and v2 (or c1 and c2) here:
    yield <your output key, value pairs>
conn.close()

2) Cree un dominio SimpledB y almacene todos sus datos allí. Lea aquí en Boto y SimpledB:http://code.google.com/p/boto/wiki/simpledbintro

Su código mapeador se vería así:

dline = dline.strip()
d0 = dline.split('\t', 1)
v1, c1 = d0[0], d0[1]
sdb = boto.connect_sdb(aws_access_key_id=<AWS_ACCESS_KEY>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>)
domain = sdb.get_domain(MY_DOMAIN_STRING_NAME)
for item in domain:
    v2, c2 = item.name, item['column']
    ## Now, insert code to do any operations between v1 and v2 (or c1 and c2) here:
    yield <your output key, value pairs>
sdb.close()

Esta segunda opción puede funcionar mejor si tiene grandes cantidades de datos, ya que puede realizar las solicitudes de cada fila de datos en lugar de la cantidad completa a la vez. Tenga en cuenta que los valores de SimpledB solo pueden tener un máximo de 1024 caracteres, por lo que es posible que deba comprimir/descomprimir a través de algún método si sus valores de datos son más largos que eso.

Otros consejos

La respuesta real a su pregunta es que MRJOB aún no admite el patrón de unión de transmisión de Hadoop, que es leer la variable de entorno MAP_INPUT_FILE (que expone la propiedad MAP.Input.Input.file) para determinar con qué tipo de archivo está tratando basado en su camino y/o nombre.

Es posible que aún pueda lograrlo, si puede detectar fácilmente solo leer los datos en sí mismo a qué tipo pertenece, como se muestra en este artículo:

http://allthingshadoop.com/2011/12/16/simple-hadoop-streaming-tutorial-using- joins-and-keys-with-python/

Sin embargo, eso no siempre es posible ...

De lo contrario, MyJob se ve fantástico y desearía que pudieran agregar apoyo para esto en el futuro. Hasta entonces, esto es más o menos un factor decisivo para mí.

Así es como uso múltiples entradas y, en base al nombre de archivo, haz cambios adecuados en la fase mapeador.

Programa de corredores:

from mrjob.hadoop import *


#Define all arguments

os.environ['HADOOP_HOME'] = '/opt/cloudera/parcels/CDH/lib/hadoop/'
print "HADOOP HOME is now set to : %s" % (str(os.environ.get('HADOOP_HOME')))
job_running_time = datetime.datetime.now().strftime('%Y-%m-%d_%H_%M_%S')
hadoop_bin = '/usr/bin/hadoop'
mode = 'hadoop'
hs = HadoopFilesystem([hadoop_bin])

input_file_names = ["hdfs:///app/input_file1/","hdfs:///app/input_file2/"]

aargs = ['-r',mode,'--jobconf','mapred.job.name=JobName','--jobconf','mapred.reduce.tasks=3','--no-output','--hadoop-bin',hadoop_bin]
aargs.extend(input_file_names)
aargs.extend(['-o',output_dir])
print aargs
status_file = True

mr_job = MRJob(args=aargs)
with mr_job.make_runner() as runner:
    runner.run()
os.environ['HADOOP_HOME'] = ''
print "HADOOP HOME is now set to : %s" % (str(os.environ.get('HADOOP_HOME')))

La clase Mrjob:

class MR_Job(MRJob):
    DEFAULT_OUTPUT_PROTOCOL = 'repr_value'
    def mapper(self, _, line):
    """
    This function reads lines from file.
    """
    try:
        #Need to clean email.
        input_file_name = get_jobconf_value('map.input.file').split('/')[-2]
                """
                Mapper code
                """
    except Exception, e:
        print e

    def reducer(self, email_id,visitor_id__date_time):
    try:
        """
                Reducer Code
                """
    except:
        pass


if __name__ == '__main__':
    MRV_Email.run()

En mi lugar, no usaría MRJOB a menos que quisiera aprovechar los servicios de Hadoop Cluster o Hadoop de Amazon, incluso si el ejemplo utiliza en ejecución en archivos locales.

Mrjob en los usos principales "Transmisión de Hadoop"Para enviar el trabajo.

Esto significa que todas las entradas especificadas como archivos o carpetas de Hadoop se transmiten al mapeador y los resultados posteriores al reductor. Todo el mapeador obtiene una porción de entrada y considera que todas las entradas son esquemáticamente la misma para que analice de manera uniforme y procese la clave, valor para cada corte de datos.

Derivando de este entendimiento, las entradas son esquemáticamente las mismas para el mapeador. La única forma posible de incluir dos datos esquemáticos diferentes es entrelazarlos en el mismo archivo de tal manera que el mapeador pueda comprender cuál son los datos vectoriales y cuáles son los datos de matriz.

You are actually doing it already.

Simplemente puede mejorar eso teniendo algunos especificadores si una línea es datos de matriz o datos vectoriales. Una vez que ve los datos vectoriales, los datos de matriz anteriores se aplican a él.

matrix, 1, 2, ...
matrix, 2, 4, ...
vector, 3, 4, ...
matrix, 1, 2, ...
.....

Pero el proceso que ha mencionado funciona bien. Debe tener todos los datos esquemáticos en un solo archivo.

Sin embargo, esto todavía tiene problemas. K, V El mapa Reduce funciona mejor cuando el esquema completo está presente en una sola línea y contiene una unidad de procesamiento único completo.

Según tengo entendido, ya lo está haciendo correctamente, pero supongo que MAP-Reduce no es un mecanismo adecuado para este tipo de datos. Espero que alguien aclare esto aún más de lo que pude.

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top