Como ya mencionamos, la idea BigTable surge como necesidad de persistir grande volúmenes de información (en términos de tera / peta bytes!) en forma eficiente. Ahora esto no solo involucra el almacenamiento, sino algo igualmente importante, el procesamiento de esta cantidad masiva de datos.
Entonces acá aparece la idea de MapReduce.
MapReduce establece, básicamente, la forma de procesar los datos de forma de:
- poder dividir el trabajo y así aprovechar la idea de procesamiento paralelo.
- basado en la idea de "divide y conquistaras"
Y además, incluye la idea de que luego, tendremos que "consolidar" los resultados de esos múltiples procesamientos.
El patrón map-reduce divide la lógica de procesamiento de la información en las siguientes partes (como si estuvieramos diseñando en objetos y metemos strategies):
- Map / Mapper: contiene la lógica que va a ser invocada por cada "key1-value1", y que se encargaría de transformarla a otra tupla "key2-value2", como output de este proceso. Usamos el número "1" y "2" para mostrar la idea de que los tipos de los valores de resultado, no tienen que ser del mismo tipo que los de entrada.
- Reduce: se va a encargar de procesar una entrada de tipo "key3-value3", que sería la concentración de las salidas de las múltiples ejecuciones del Mapper. El key, va a ser los diferentes valores adoptados por "key2", es decir el output del Mapper. Pero a diferencia de este, el Reduce no se va a invocar con cada entrada "key2-value2", sino que el "framework MapReduce" va a invocarlo una única vez con cada valor de "key2", y como "value2" una lista de todos los resultados que se obtuvieron como salida del Mapper, para ese key dado.
- Framework MapReduce: se refiere al componente que coordina la ejecución completa de la lógica, e invoca al Map, y al Reduce. Realizando las combinaciones necesarias en el medio.
Este pseudo-diagrama ejemplifica las transformaciones:
(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)
Como esto seguro que no se entendió en abstracto vamos a dar ejemplos simples de este patron, fuera de BigTable y fuera de cualquier otra tecnología.
Ejemplo:
- Tenemos una lista de entradas <nombreDocument, texto>
- Queremos, básicamente, contar las apariciones de las diferentes palabras en todos los documentos (o podría ser en un subset)
- Es decir, que queremos obtener como resultado una lista de <palabra, nro> (donde "nro" sería el número de ocurrencias)
- Podemos pensar esta lógica en términos de MapReduce de la siguiente forma:
<nombreDocument, texto -> map -> <palabra, 1>* -> combine -> <palabra,nros> -> reduce -> <palabra, nro>
Atención que hay varios detalles acá:
- map:
- lee el "texto" y por cada palabra de el, genera una salida de tipo <palabra, 1>
- Ojo acá, vean que una invocación al map, no necesariamente origina un único resultado, podría generar varias salidas, o de hecho, podría no generar ninguna.
- combine:
- combina todas las salidas generadas por los map's y genera tuplas con:
- key: la "palabra"
- value: una lista de todos los diferentes resultados "salida", del map. Es decir una lista con todos los "1" en nuestro caso.
- Ejemplo:
- [ "casa" , { 1, 1, 1}] ,
- [ "arbol", { 1 } ] ,
- [ "el" , { 1, 1, 1, 1, 1 } ].
- reduce:
- recibe como input entradas con "palabra" y todas las ocurrencias, y las consolida sumándolas.
- genera como resultado tuplas de la forma:
- [ "casa" , 3 ] ,
- [ "arbol", 1 ] ,
- [ "el" , 5 ].
Manos a la Obra: ejemplo en HBase
Veamos a ver como implementaríamos este ejemplo en java utilizando HBase como implementación de BigTable.
Vamos a crear las siguientes tablas:
documentos
{ "0" : { "detalles" : {
"texto" : "Érase una vez, hace mucho, mucho tiempo ...", }
},
"1" : { "detalles" : {
"texto" : "Un fantasma recorre Europa ...", }
}, ...
}
Donde
- row: es un id de tipo numérico para cada libro / documento. Entonces cada fila, representa un documento.
- columnFamily "detalles": nada, un único columnFamily.
- column "texto": única columna que tenemos que contiene el texto del libro.
Y por otro lado, vamos a necesitar otra tabla para guardar los resultados del MapReduce, es decir las palabras y su número de ocurrencias.
palabras
{ "Europa" : { "detalles" : {
"occurencias" : "1", }
},
"fantasma" : { "detalles" : {
"texto" : "1", }
}, ...
}
Entonces para crear esto, vamos al shell de HTable y ejecutamos:
> create 'documentos', 'detalles'
0 row(s) in 0.4120 seconds
Y ahora:
> create 'palabras', {NAME=>'detalles', VERSIONS=>1}
0 row(s) in 1.2590 seconds
Listo, ya tenemos nuestras tablas. Pero sin datos, por eso ...
Populando datos para el ejemplo
Vamos a aprovechar las capacidades de programación que nos dá el API para codificar un main() en java que popule la tabla de documentos con palabras random.
- Miramos un poquito lo que hace la clase DocumentsImporter.java
- Es básicamente código java, que genera contenido aleatorio con la clase Random.
- Genera 100.000 documentos con texto que puede ir de 0 a 6 palabras.
Lo ejecutamos entonces, y para comprobar lo que hizo vamos de nuevo al shell y hacemos un scan
> scan 'documentos', {LIMIT => 5}
ROW COLUMN+CELL
\x00\x00\x00\x00 column=detalles:texto, timestamp=1305496261528, value=azul abajo como
\x00\x00\x00\x01 column=detalles:texto, timestamp=1305496261528, value=ante
\x00\x00\x00\x02 column=detalles:texto, timestamp=1305496261528, value=elite cabe comentario ante
\x00\x00\x00\x03 column=detalles:texto, timestamp=1305496261528, value=coma
\x00\x00\x00\x04 column=detalles:texto, timestamp=1305496261528, value=a ella ella fiera azul
5 row(s) in 0.2610 seconds
(de paso vemos como limitar la cantidad de resultados con {LIMIT => 5}
Ahora si, ya podemos mirar un poco cómo está hecho el MapReduce de nuestro ejemplo, en java.
- Vemos que estas operaciones se ejecutan a través de un objeto "Job". Que tiene que ver con la ejecución sincrónica/asincrónica la distribución del trabajo entre diferentes nodos del cluster.
- Mapper:
- Vemos que se configura inicialmente con el nombre de la tabla de input, y un Scan (justo como el que vimos antes en el otro ejemplo) y con un conjunto de parámetros con la especificación de su output (los tipos)
- Vemos que hace básicamente lo que dijimos:
- obtiene el valor de la columna 'detalles:texto'
- parsea el texto a una lista de palabras
- luego, emite un ouput por cada palabra, con el número 1 como value
- Reduce:
- Vemos que se configura con el nombre de la tabla para el output.
- Esta clase es aún más clara que el Map.
- Recibe como parámetro la palabra, en formato Text y los valores en forma de Iterable<IntWritable>
- Los sumariza.
- Crea un objeto Put (como vimos en los ejemplos anteriores), para guardar el resultado.
- pone el put en el objeto context
Ejecutamos entonces este main y luego comprobamos como quedó la tabla "palabras"
> scan 'palabras', {LIMIT => 5}
ROW COLUMN+CELL
a column=detalles:ocurrencias, timestamp=1305498465221, value=9483
abajo column=detalles:ocurrencias, timestamp=1305498465221, value=9714
ante column=detalles:ocurrencias, timestamp=1305498465221, value=9564
anterior column=detalles:ocurrencias, timestamp=1305498465221, value=9676
arriba column=detalles:ocurrencias, timestamp=1305498465221, value=9898
5 row(s) in 0.0740 seconds
Vemos ahí que las palabras quedan ordenadas alfabéticamente, y podemos ver el contador.
Otras cosas interesantes para ver
- Distribución:
- Como el patron MapReduce establece ya el contrato entre sus partes, y cada una opera sobre una row dada, el mismo framework se puede encargar de distribuir nuestras clases de Map & Reduce entre varios nodos del cluster, y así ejecutar la lógica en paralelo, sobre diferentes particiones de la tabla.