Data Transformation Pipelines (Apache Nifi, Flink, Kafka y MongoDB) (Parte 1)

Data Transformation Pipelines (Apache Nifi, Flink, Kafka y MongoDB) (Parte 1)

Ya en anteriores artículos hemos explicado la importancia de tratar con datos organizados ya que el Big Data como tal, ¡no sirve de nada!. Para ello existen los llamados procesos de transformación (Data Transformation) que consisten, entre otros, a convertir ese Big Data en un Smart & Small Data Concept. Para ello podemos utilizar muchas tecnologías, ya anteriormente hemos hablado de FluentD o bien del Stack ELK (Elasticsearch + Logstash + Kibana) como buenas soluciones para poder crear flows de Data Transformation, mejor dicho, Flows envío y agregación de Datos.

Data Transformation Pipeline

Hoy hablaremos de la fusión entre Apache NiFi, Apache Flink, Apache Kafka y MongoDB como sistema de ingesta, procesamiento y almacenamiento de mensajes. Primero, una breve presentación de cada componente, indicar que los tres primeros, son proyectos de la Apache Software Foundation:

  • Apache NiFi permite la automatización de flujos de datos entre sistemas. Se puede describir como el Sistema de logística de los datos ya que Apache NiFi ayuda a mover y rastrear los datos. El proyecto está escrito utilizando la programación basada en flujos y proporciona una interfaz de usuario basada en web para gestionar dichos flujos de datos en tiempo real.

  • Apache Flink es un marco de procesamiento de flujos de datos para aplicaciones de flujo de datos distribuidas, de alto rendimiento, siempre disponibles y precisas.

  • Apache Kafka se utiliza para construir Pipelines de datos en tiempo real como, también, aplicaciones basadas en sistemas streaming. Es escalable horizontalmente, tolerante a fallos y muy rápido.

  • MongoDB es la base de datos NoSQL más conocida y podemos decir que dota a nuestros proyectos y/o necesidades el ser más ágiles y escalables. Es perfecta para infraestructuras cloud y la convierten en la plataforma perfecta para datos de críticos.

Objetivo del artículo: Transformar Big Data en Smart & Small Data

Bien, presentadas las piezas de la Solución (hay muchas más), vamos a dar paso al puzzle que forman todas ellas. Como se ve en la imagen que precede éste artículo, tenemos un origen de datos (Data Producer) y queremos conectar, tras un proceso de transformación, con un destino o mejor dicho, consumo y visualización de esos datos. Siguiendo la premisa de: Transformar el Big Data en Smart & Small Data.

En esta primera parte, realizaremos un ejercicio, para poder explicar la Pipeline de Data Transformation, donde a partir de una Fuente de Datos, en nuestro caso Twitter (uno de los grandes símbolos del BigData), realizaremos su aprovisionamiento y carga de los datos, con una mínima categorización y lo almacenaremos en un Broker y Base de Datos (al mismo momento) para que pueda ser consumido por otra aplicación (consumidor). Véase la imágen siguiente:

Data Transformation Pipeline - Step 1

Instalación y Configuración de Apache NiFi

Bien, la primera de las piezas será Apache NiFi, que tendrá el Rol de receptor de los mensajes es Nifi y a su vez, de escuchar, formatear y primer filtro. La idea es utilizar NiFi como receptor de los mensajes, tener un mínimo tratamiento/categorización y pasarlos al siguiente Step del proceso.

Aunque, el primer paso será instalar y configurar Apacke Kafka, y para ello podemos seguir el artículo, publicado anteriormente, donde se explica paso a paso como poder realizarlo.

NOTA: Apache NiFi requiere de JAVA (Anterior a la versión 1.0 se requiere JAVA 8 o superior), en este caso hemos instalado el entorno de ejecución de Java (JRE) mediante el command: apt-get install default-jre aunque si se desea el entorno JDK apt-get install default-jdk. Hay un buen artículo para los que siempre tenemos problemas instalando JAVA.

Para realizar la instalación de Apache NiFi, debemos realizar estos pasos siguientes:

cd /opt

#Para JAVA 8 o superior
wget https://archive.apache.org/dist/nifi/1.3.0/nifi-1.3.0-bin.tar.gz
#Para JAVA 7
wget https://archive.apache.org/dist/nifi/0.7.4/nifi-0.7.4-bin.tar.gz

tar xvzf nombre-fichero-version-nifi.tar.gz

ln -s nifi-X.X.X nifi
cd nifi
bin/nifi.sh install dataflow
bin/nifi.sh start
bin/nifi.sh status

Si todo va bien y, a partir de un navegador web, con la URL siguiente: http://URL-HOST-APACHE-NIFI:8090/nifi/ podremos ver el Dashboard de management de Apache NiFi.

Dashboard Apache NiFi

Construcción de un Flow de Carga de Datos con Apache NiFi

Una vez accedemos al Dashboard de Apache NiFi, vamos a realizar los pasos para configurar nuestro Flow de carga de datos. Los voy enumerando a continuación:

Añadiendo procesador “GetTwitter”

  1. Añadiremos un procesador, lo podemos localizar en la barra superior del menú, el primero de los iconos. Arrastraremos el icono hacia la cuadrícula central para poder crearlo.
  2. Para nuestro ejercicio usaremos el procesador de Twitter, podemos filtrar y encontraremos el tipo “GetTwitter”. Lo añadiremos (Add).
  3. Para poder configurar el procesador, haremos botón derecho y seleccionaremos “Configure”, esto nos abrirá una nueva ventana para poder introducir las configuraciones personalizadas.
  4. En la pestaña “Properties” podremos introducir los campos “Consumer Key”, “Consumer Secret”, “Access Token” y “Access Token Secret” que a partir de Twitter Application podremos conseguir. Los introducimos.
  5. Sin salir de la pestaña “Properties”, deberemos seleccionar, de la lista desplegable, la opción Sample Endpoint en la fila “Twitter Endpoint”. Si queremos realizar algún tipo de filtro sea por IDs de usuario, localización, etc… seleccionaremos la opción “Filter Endpoint”.
  6. Para finalizar pulsaremos el botón “Apply”.

Añadiendo procesador “EvaluateJsonPath”

Una vez de vuelta al Dashboard principal, añadiremos un nuevo procesador, filtrando por “evaluate” y seleccionando el tipo “EvaluateJsonPath”.

  1. Una vez añadido el nuevo procesador “EvaluateJsonPath”, en la pestaña “Properties” en la primera fila “Destination” seleccionaremos a partir del desplegable: “flowfile-attribute”.
  2. Ahora añadiremos dos nuevas propiedades relacionadas con los campos text y lang del JSON. En la parte superior derecha pulsaremos el botón “New property”, asignando el nombre “twitter.lang” y su valor “$.lang” a la nueva propiedad. Y, de nuevo, los mismo pero para “twitter.text” con el valor “$.text”.
  3. Antes de terminar esta configuración, en la pestaña “Settings” activaremos dos de los tres flasg que se encuentran en la derecha: “failure” y “unmatched”.

Ya creados los dos procesadores, ahora vamos a conectarlos. Simplemente con posicionarnos con el mouse sobre el primero, nos aparecerá un icono con una flecha, lo seleccionamos y arrastramos sin dejar de hacer click hacia el segundo procesador, para conectarlos, ahora ya podemos dejar de hacer click.

En el monento de la unión, nos aparecerá una nueva ventana. En ella y concretamente en la pestaña “Settings” vamos a seleccionar la política que queremos aplicar, arrastraremos entonces la opción “FirstInFirstOutPrioritizer” de la lista de “Available prioritizers” (derecha superior) hacia el al área “Selected Prioritizers” (derecha inferior). Añadimos (Add).

Añadiendo procesador “RouteOnAttribute”

No hemos terminado todavía, nos falta añadir la relación “matched” para el procesador “EvaluateJsonPath” para ello añadiremos un nuevo procesador. Aunque en esta ocasión filtraremos por “route” y seleccionando la opción “RouteOnAttribute”. Añadiremos (Add).

  1. Una vez creado el procesador, vamos a configurarlo (como los anteriores). Le asignaremos un nombre si queremos y activamos el flag “unmatched” de la parte derecha.
  2. Ya en “Properties” añadimos una “New property”, le ponemos un nombre y añadimos el valor: “${twitter.text:isEmpty():not():and(${twitter.lang:equals(“es”)})}“. Con esto conseguimos filtrar aquellos tweets que estén vacíos y cuyo idioma no haya sido marcado como español. Añadimos (Add) y Aplicamos (Apply).

Si bien es cierto, este último paso también lo hubiésemos podido realizar al principio, en el momento de configurar el procesador “getTweets”.

Ya casi como último paso, procederemos a conectar el procesador “EvaluateJsonPath” con el de “RouteOnAttribute”. Ya en la nueva ventana, en la pestaña “Details” tendremos la opción “For relationships”. Activaremos el flag “matched”. En la pestaña “Settings” volveremos a indicar la política que queremos aplicar, será la misma que en el caso anterior: “FirstInFirstOutPrioritizer” y Añadiremos (Add).

Añadiendo procesador “PutKafka” & “PutMongo”

Ya como último paso, ahora sí, vamos a añadir ahora un cuarto y último procesador. En esta ocasión filtramos por “kafka” y seleccionamos la opción “PutKafka”.

  1. Una vez creado el procesador, como en las anteriores ocasiones, le asignamos un nombre y activamos los dos flags de la opción “Auto termination relationships”. Este ya será el final de nuestro proceso.
  2. En la pestaña de “Properties” configuraremos los campos “Known Brokers”, “Topic” y “Client Name” ya usando la configuración del Kafka que tenemos desplegado. Añadimos (Add) y Aplicamos (Apply).

Realizamos la última de las conexiones entre procesadores, correspondiente a “RouteOnAttribute” y “PutKafka”. Activaremos el flag “tweet”. En la pestaña “Settings” volveremos a indicar la política que queremos aplicar, será la misma que en el caso anterior: “FirstInFirstOutPrioritizer” y Añadiremos (Add).

Si todo va bien deberíamos tener algo parecido a la siguiente imagen:

Procesadores Apache NiFi

Para MongoDB haremos lo mismo, pero filtrando por “mongo” y realizaremos las conexiones a nuestra BD. En la Documentación de Apache NiFi podemos encontrar una entrada dedicada a MongoDB, es interesante darle lectura ya que algunos parámetros son específicos.

En un próximo artículo haremos la Parte 2, donde realizaremos mediante Apache Flink la Transformación del Dato (tweet) para adaptarlo a nuestras necesidades.

Autor: Joakim Vivas

comments powered by Disqus