Replicación y Formas de Paralelización Apache Hadoop

Hasta este post debemos tener idea de como funciona Hadoop, YARN, Sqoop, Hive y Zookeeper, ahora intentaré caracterizar que es un clúster y como es que se lleva a cambio un procesamiento en paralelo, esto nos permitirá realizar otros dos programas antes de dar por terminada la parte básica de Big Data y movernos a algo más complejo y avanzado, como hemos visto el MapReduce es la forma de paralelización que surge con esta tecnología y así mismo es la primera forma de realizar un procesamiento en paralelo, pero al final un solo se mejora su efectividad en el procesamiento de los datos.

Ahora trataré de formalizar todo a lo que llamamos clúster, junto con su manera de procesar la información para poder realizar los siguientes programas.



Empezaré definiendo el concepto de rack y de clúster.


Un ejemplo gráfico de lo anterior podría ser la siguiente imagen:

Con lo anterior definido, tenemos que empezar a caracterizar algunas otras propiedades de un clúster, y continuaré por comentar algo que es importante en un clúster de Hadoop, aún no hablaré de este tema ya que será el desarrollo de toda una explicación dentro de dos post siguientes sin contar este, pero ya en aplicaciones distribuidas tenemos que estar cocientes de que hay puntos vulnerables al ser distribuido y uno que nos concierne en Apache Hadoop, es la Tolerancia a Particiones, como comenté, no entraré en detalles de estas características así que solo las dejaré como mención.

Entonces al ser un tema de preocupación para nuestro análisis, ¿Cómo es que lidiamos con la Tolerancia a Particiones?, bueno Hadoop cuenta con un Algoritmo llamado Rack Awareness Algorithm que es el encargado de las replicas de información de un bloque, a continuación describiré como es que funciona el algortimo, es un algoritmo diseñado a la Tolerancia a Particiones.

Cómo mencione en el post de Teoría de Apache Hadoop, el factor de replicación de Apache Hadoop por default es 3 y Apache Hadoop esta diseñado para dividir archivos muy grandes entre bloques más pequeños, que por default son bloques de 128MB y como también ya mencione el que tiene el control total de como se maneja la información es el Namenode, pero veamos como es que el namenode toma las decisiones que toma al distribuir la información.

Empezaremos con un ejemplo como el siguiente, imaginemos que tenemos un archivo de 250 MB, nosotros sabemos que esa información la dividirá en dos bloques uno en un bloque b1 de 128 MB y otro bloque b2 de 122 MB, ahora tal vez te estés preguntando, ¿Por qué el segundo bloque tiene un tamaño de 122 MB? Muy sencillo, Apache Hadoop realiza particiones disjuntas de información sobre nuestro archivo principal, entonces en este caso como hace particiones de 128 MB, tenemos que la siguiente operación 250 MB - 128 MB = 122MB, por lo que el segundo bloque debe tener el tamaño de la información restante que es 122 MB, en la imagen siguiente se representa lo anterior de forma gráfica.



Una vez entendida la forma en que se realiza una partición de cualquier tipo de información por Apache Hadoop daré la siguiente definición formal del proceso anterior.

De la manera anterior podemos definir de forma abstracta el particionamiento de un archivo o de cualquier ente que este construido de información (videos, audio, etc).

Ahora procederé a caracterizar el algoritmo Rack Awareness que sirve para la replicación de los bloques en todo el clúster.

Pero antes de tratar de generar una caracterización de dicho proceso explicaré como es que funciona para tener la noción de lo que hace este algoritmo con las siguientes palabras.


  1. Guardar la primera replica donde sea, usualmente un nodo aleatorio (si el cliente se encuentra fuera del clúster), o guardar la primera replica en el nodo donde el cliente ha mandado la petición (si el cliente pertenece al clúster).
  2. Poner la segunda replica en un rack diferente al que se ha puesto la primera replica.
  3. Poner la tercera replica en el mismo rack que el del punto 2, pero en un nodo diferente.
  4. Si hay más replicas que las 3 que son por default deberán ponerse en racks diferentes a los que ya usamos.
Los pasos anteriores lucirían de forma gráfica como la siguiente imagen, podemos ver que el cliente que pide la replicación para el Bloque A, se encuentra en el RACK 2 y en el nodo 7, el es quién contiene la información principal y sus otras 2 replicas viven en el RACK 3 en los nodos (nodo 10 y nodo 11).


Y tal vez te estés preguntando, ¿Qué pasa si un nodo se muere? o peor aún, ¿Si un rack se muere?, bueno el algoritmo rack awareness lo tiene controlado, supongamos que se muere el nodo 11 de acuerdo a nuestras reglas anteriores, un rack debe de tener a lo más 2 replicas del mismo bloque en diferentes nodos, entonces el namenode es el encargado de monitorear esta falla e inmediatamente realizar una copia de todos las replicas que contenga este nodo en otro nodo(s) disponible en este rack, como se muestra en la siguiente imagen.

El mismo principio funciona cuando hay una ausencia de uno o más racks, siempre y cuando no sea perdida total.

Con el algoritmo de rack awareness se cumplen algunas garantías como las siguientes:
  1. Mejora el funcionamiento de la red: Como ya vimos la comunicación entre racks es por medio de los switch de cada rack. En general tendremos mayor ancho de banda entre las máquinas que pertenecen a un mismo rack, que entre diferentes máquinas de racks distintos, cosa que se formalizará más abajo. Este algoritmo optimiza la escritura y lectura reduciendo el tiempo y el tráfico entre las máquinas del clúster.
  2. Previene la perdida de información: No nos tenemos que preocupar de la perdida de información, incluso si un rack completo se llega a caer como lo mencione anteriormente o por alguna falla del swtich que conecta los racks, este algoritmo viene de el dicho que "nunca pongas todos los huevos en una misma caja".
Ahora con el algoritmo ya descrito de forma práctica, ahora que entendemos como es que se realiza una replicación, procederé a explicar el proceso detalladamente de lectura en HDFS, sobre un clúster, esto es necesario para que en nuestra parte final definamos de manera formal el procesamiento en paralelo y podamos realizar dos ejemplos antes de pasar a cosas un poco más complejas y entretenidas.

Lectura de HDFS.

Supongamos que tenemos el siguiente de la imagen anteriormente utilizada, entonces este archivo se divide en 2 bloques y por default tiene un factor de replicación de 3.


Ahora supongamos que tenemos un namenode en un nodo afuera y que el cliente se encuentra en otra máquina afuera del clúster, entonces el cliente pide para leer la información correspondiente a la imagen azul entonces listaré lo que hace el namenode para regresar la información para su lectura.

  • El cliente tratará de ver comunicarse con el namenode preguntando por los bloques de metadata para el archivo "informacion".
  • El namenode regresará la lista de datanodes donde cada bloque (Block A and B) se encuentra almacenada, cabe destacar que esta lista lo hace en base a que bloques se encuentran más cerca del cliente, esto reduce la latencia en la lectura y reduce el consumo del ancho de banda de nuestra red.
  • El cliente se conectará a los datanodes donde la información se encuentra almacenada.
  • El cliente comenzará a leer la información en paralelo  desde cada datanode (Block A de datanode 1 y Block B de datanode 3).
  • Una vez que el cliente obtiene todos los bloques de el archivo solicitado, el cliente empezará a combinar todos los bloques en uno solo, para regresar a un solo archivo con toda la información correspondiente.
Este proceso lo muestra la siguiente ilustración:


Una vez que hemos llegado hasta este punto, entendemos como se realizan las replicas en el clúster y como es que un cliente accede a los bloques de información, ahora, el encargado de asignar recursos para el procesamiento es el YARN, pero funciona bajo el mismo concepto que el de la ilustración anterior el manda el código para procesamiento de la información en los nodos del clúster que contengan la información más próxima al cliente, aunque el es el único encargado de si procesar los bloques en 1,2 ó n nodos disponibles de nuestro clúster.

La definición siguiente, simplemente es la definición de un espacio métrico y una métrica.
Y se preguntarán, ¿Para que sirve lo anterior?, bueno déjenme mostrarles, que pasaría si tomáramos el siguiente ente Matemático.

Cómo podemos ver, pareciera ser que la arquitectura de hadoop de replicación y lectura tiene forma de un espacio métrico y es que así debería ser, pero nuestra métrica p, no cumple con la condición 1 y 2 de las propiedades de una métrica, ya que la menor unidad de tiempo de un ping es 1 milisegundo, pero esto se puede solucionar definiendo una métrica equivalente y trabajando con esa métrica.



Entonces ahora podemos trabajar con la métrica p' y es que con esto podemos modelar perfectamente la forma en que se leen los bloques y la forma en la que se asignan los procesos y por que es que lo digo, supongamos que queremos leer un archivo, entonces los bloques se podrían elegir de la siguiente manera.


Una vez que hemos llegado hasta este punto, se caracterizara el procesamiento en paralelo, para mi existen dos formas de procesamiento las cuales describiré a continuación.

Supongamos que tenemos el archivo de la imagen anterior que pesa 250 MB y tiene el contenido de la derecha, ahora bien, por lo anterior visto sabemos que hadoop lo dividirá en dos bloques de 128 MB y uno de 122 MB, supongamos que los bloques son los siguientes.
Ahora como podemos ver, si deseamos contar el número de palabras como el wordcount en este texto, hadoop irá a buscar las replicas de estos bloques cercanas a la máquina(cliente) que realizo la petición y procesara en dos nodos el mismo código con diferentes bloques de informción el A y B, hasta aquí todo esta muy bien, pero que pasaría si quisiéramos realizar un análisis de sentimiento en el texto completo, este tipo de procesamiento no nos seria de utilidad, ya que analizar el texto por separado no nos daría un resultado muy exacto, entonces podrá pensar, ¿no todo es paralelizable?, la respuesta es si, pero podemos utilizar la bondad del procesamiento en paralelo, por que en lugar de solo analizar un texto,  podremos calcular el sentimiento de n textos al mismo tiempo, donde n es el número de nodos con el que cuenta nuestro clúster, entonces como podemos notar, existen dos formas de paralelización, las denominaré la forma natural y la forma no natural. Una vez explicado esto daré una caracterización de estas formas de procesamiento.

Si nuestra función cumple con estas características podremos decir que nuestro proceso es una paralelización trivial.

En caso de que la propiedad número 5 no sé cumpla, podremos afirmar que existe una paralelización del tipo no trivial. 

Entonces gracias al algoritmo de Rack Awareness, podemos definir la máquina que procesara la información de la siguiente manera.



Ahora una vez que sabemos que máquinas regresaran la información usualmente estas son las mismas que procesan la misma, donde en caso que realice una paralelización del tipo trivial deberá cumplir con todos los puntos de la Definición 1.4, en caso de no ser así podrá realizarse una paralelización del tipo no trivial donde necesitaremos I completamente para aplicarle cualquier transformación que deseemos al conjunto de información.

Una vez explicada esta teoría  en los próximos 3 posts se realizaran ejemplos de paralelización para dar por terminado esta parte básica de Big Data y pasar a algunos conceptos más avanzados y más interesantes para tener más herramientas para este mundo del manejo de grande volúmenes de información.

Comentarios

  1. una consulta ¿como se comporta la distribución de datos con replica mayor a 3 en los rack? (ej 4 o 5)
    2 + 1 + 1 o 2 + 2 + 0

    ResponderBorrar
    Respuestas
    1. claro, para empezar no recuerdo si lo puse en el blog, pero el número de racks no debe ser mayor al número de replicas definido, una vez sabiendo esto entonces jamás deberíamos tener el caso p < r, donde p es el número de replicas y r el número de racks.

      Otra restricción es que nunca debe haber más de 2 replicas en un rack, por lo que para tu ejemplo para p=4, tendríamos 3 escenarios validos r=2, r=3 y r=4, pero en los 3 casos se distribuirían de la misma manera.

      Tendríamos que en dos racks diferentes habría 2 replicas y esas dos replicas estarían en nodos diferentes del rack.

      para el caso en que p=5 entonces en este caso ya utilizaríamos 3 racks diferentes y así seguirían las combinaciones, pero siendo analíticos p=3 es el caso idóneo y perfecto como número de replicas y garantizar la alta disponibilidad.

      Borrar

Publicar un comentario

Entradas más populares de este blog

Manejo Apache Hive

Asignación de un líder Zookeeper