En este post vamos a realizar un Workflow de conexión a Twitter, a partir de su API, para poder guardarlos, posteriormente, en un Elasticsearch mediante Apache NiFi.
La estructura del Workflow es muy simple, tendrá cuatro pasos, los siguientes:
- Descargar los tweets mediante la conexión a la API de Twitter.
- Estructurar el fichero de salida de la API a un tipo JSON.
- Enrutar el fichero al destino aplicando un filtro (opcional).
- Guardar el contenido del JSON en un Elasticsearch.
Aunque, primero deberemos arrancar nuestro entorno de trabajo, compuesto de dos únicos componentes: Apache NiFi y Elasticsearch. Será muy simple ya que lo haremos mediante Docker a partir de estos commands:
Arrancar Dockers con Apache NiFi y Elasticsearch
Arrancaremos primero el contenedor Docker con Elasticsearch:
sudo docker run -d --name elastic -p 9200:9200 -p 9300:9300 elasticsearch
Seguidamente y es opcional, levantaremos Kibana para poder controlar más visualmente a Elasticsearch. En este caso pasaremos como parámetro de configuración la URL de Elasticsearch: -e ELASTICSEARCH_URL=http://IP-HOST-DOCKER-ELASTIC:9200
sudo docker run -d --name kibana -p 5601:5601 -e ELASTICSEARCH_URL=http://IP-HOST-DOCKER-ELASTIC:9200 kibana
Una vez levantados los Docker con Elasticsearch y Kibana, procederemos a levantar Apache NiFi:
sudo docker run -d --name kibana -p 8080:8080 apache/nifi
Una vez finalizado, ya podremos empezar a construir el Workflow mediante Apache NiFi. Accederemos a partir de la URL: http://IP-HOST-DOCKER-NIFI:8080
GetTwitter
Ya dentro de Apache NiFi, vamos a construir un procesador, utilizaremos el llamado “GetTwitter” y lo configuraremos muy fácilmente. Si bien es cierto, deberemos tener, antes, unos datos de la API de Twitter, los siguientes:
- Consumer Key (API Key)
- Consumer Secret (API Secret)
- Access Token
- Access Token Secret
El siguiente paso será configurar las propiedades del procesador, donde deberemos introducir tanto los datos anteriores como el filtro (Terms to Filter On) que querramos, por ejemplo un hashtag o palabra clave. Podemos verlo en la siguiente imagen de forma resumida:
Transform to JSON Format
Seguidamente, miraremos de transformar la información que nos devuelve la API de Twitter en un fichero JSON, en este caso, miraremos de clasificar a la vez tanto la lengua del tweet como el própio contenido del texto. Para ello añadiremos el procesador llamado “EvaluateJsonPath” y lo configuraremos con las siguientes propiedades, podemos verlo en la siguiente imagen de forma resumida también:
Route and Attribute
Ahora, una vez ya le hemos dado un formato más estructurado a los datos descargados de Twitter, vamos a mirar de hacer el routing hacia Elasticsearch añadiendo un nuevo procesador llamado “RouteOnAttribute” con la propiedad, por ejemplo: el filtrar por lengua y por contenido. Básicamente aprovecharemos la clasificación añadida anteriormente. Veamos entonces la imagen para entenderlo mejor:
Put to Elasticsearch
Y ya por último, añadiremos el procesador llamado “PutElasticsearchHttp” que será el que hará el Put de datos a Elasticsearch. Aquí establaceremos la propiedads siguientes:
- Elasticsearch URL: será la URL + puerto de comunicación.
- Index: el índice de Elasticsearch que usaremos.
- Type: default
Todas las otras propiedades las podemos dejar por defecto. Veamos la imagen:
Conectar los procesadores
Ahora ya último paso de todo será conectar todos los procesadores. Deberemos establecer el routing tanto de “Failure”, “Retry” y “Success”.
Visualizar los datos introducidos a Elasticsearch mediante kibana
Tras poner en marcha el Workflow, podremos dirigirnos a Kibana para poder visualizar la entrada de los datos y ya podremos dar por finalizado la construcción de esta carga de datos.
Autor: Joakim Vivas