Pull data from Twitter and push data to Elasticsearch using Apache NiFi

Pull data from Twitter and push data to Elasticsearch using Apache NiFi

En este post vamos a realizar un Workflow de conexión a Twitter, a partir de su API, para poder guardarlos, posteriormente, en un Elasticsearch mediante Apache NiFi.

NiFi Twitter API to Elasticsearch

La estructura del Workflow es muy simple, tendrá cuatro pasos, los siguientes:

  • Descargar los tweets mediante la conexión a la API de Twitter.
  • Estructurar el fichero de salida de la API a un tipo JSON.
  • Enrutar el fichero al destino aplicando un filtro (opcional).
  • Guardar el contenido del JSON en un Elasticsearch.

Aunque, primero deberemos arrancar nuestro entorno de trabajo, compuesto de dos únicos componentes: Apache NiFi y Elasticsearch. Será muy simple ya que lo haremos mediante Docker a partir de estos commands:

Arrancar Dockers con Apache NiFi y Elasticsearch

Arrancaremos primero el contenedor Docker con Elasticsearch:

sudo docker run -d --name elastic -p 9200:9200 -p 9300:9300 elasticsearch

Seguidamente y es opcional, levantaremos Kibana para poder controlar más visualmente a Elasticsearch. En este caso pasaremos como parámetro de configuración la URL de Elasticsearch: -e ELASTICSEARCH_URL=http://IP-HOST-DOCKER-ELASTIC:9200

sudo docker run -d --name kibana -p 5601:5601 -e ELASTICSEARCH_URL=http://IP-HOST-DOCKER-ELASTIC:9200 kibana

Una vez levantados los Docker con Elasticsearch y Kibana, procederemos a levantar Apache NiFi:

sudo docker run -d --name kibana -p 8080:8080 apache/nifi

Una vez finalizado, ya podremos empezar a construir el Workflow mediante Apache NiFi. Accederemos a partir de la URL: http://IP-HOST-DOCKER-NIFI:8080

GetTwitter

Ya dentro de Apache NiFi, vamos a construir un procesador, utilizaremos el llamado “GetTwitter” y lo configuraremos muy fácilmente. Si bien es cierto, deberemos tener, antes, unos datos de la API de Twitter, los siguientes:

  • Consumer Key (API Key)
  • Consumer Secret (API Secret)
  • Access Token
  • Access Token Secret

El siguiente paso será configurar las propiedades del procesador, donde deberemos introducir tanto los datos anteriores como el filtro (Terms to Filter On) que querramos, por ejemplo un hashtag o palabra clave. Podemos verlo en la siguiente imagen de forma resumida:

NiFi GetTwitter

Transform to JSON Format

Seguidamente, miraremos de transformar la información que nos devuelve la API de Twitter en un fichero JSON, en este caso, miraremos de clasificar a la vez tanto la lengua del tweet como el própio contenido del texto. Para ello añadiremos el procesador llamado “EvaluateJsonPath” y lo configuraremos con las siguientes propiedades, podemos verlo en la siguiente imagen de forma resumida también:

NiFi EvaluateJsonPath

Route and Attribute

Ahora, una vez ya le hemos dado un formato más estructurado a los datos descargados de Twitter, vamos a mirar de hacer el routing hacia Elasticsearch añadiendo un nuevo procesador llamado “RouteOnAttribute” con la propiedad, por ejemplo: el filtrar por lengua y por contenido. Básicamente aprovecharemos la clasificación añadida anteriormente. Veamos entonces la imagen para entenderlo mejor:

NiFi RouteOnAttribute

Put to Elasticsearch

Y ya por último, añadiremos el procesador llamado “PutElasticsearchHttp” que será el que hará el Put de datos a Elasticsearch. Aquí establaceremos la propiedads siguientes:

  • Elasticsearch URL: será la URL + puerto de comunicación.
  • Index: el índice de Elasticsearch que usaremos.
  • Type: default

Todas las otras propiedades las podemos dejar por defecto. Veamos la imagen:

NiFi PutElasticsearchHttp

Conectar los procesadores

Ahora ya último paso de todo será conectar todos los procesadores. Deberemos establecer el routing tanto de “Failure”, “Retry” y “Success”.

Visualizar los datos introducidos a Elasticsearch mediante kibana

Tras poner en marcha el Workflow, podremos dirigirnos a Kibana para poder visualizar la entrada de los datos y ya podremos dar por finalizado la construcción de esta carga de datos.

NiFi Kibana

Autor: Joakim Vivas

comments powered by Disqus