Connecting NiFi to ElasticSearch

I am trying to solve one problem and would appreciate any help - links to documentation, links to forums or other frequently asked questions besides https://cwiki.apache.org/confluence/display/NIFI/FAQs , or any meaningful answer in this post =).

So, I have the following task: The initial part of my system collects data every 5-15 minutes from different DB sources. Then I remove duplicates, remove garbage, combine data from different sources according to logic and then redirect it to the second part of the system as multiple threads. As far as I know "NiFi" can do this in the best way =).

Currently, I can successfully get information from InfluxDB by the GetHTTP processor. However, I cannot configure the same processor to receive information from the Elastic DB with all the required parameters. I would like to receive data every 5-15 minutes for a period of time from "now-minus- <5-15 minutes>" to "now". (depends on the scheduler period) with several additional filters. If I understand correctly, this can be achieved either by subscribing to "_index", or by regularly querying the database at a given interval.

I know NiFi has several specific processors designed for Elasticsearch (FetchElasticsearch5, FetchElasticsearchHttp, QueryElasticsearchHttp, ScrollElasticsearchHttp) as well as GetHTTP and PostHTTP processors. However, unfortunately, I have no information or even better - examples - how to customize your "Properties" for my purposes = (.

What is the difference between FetchElasticsearchHttp, QueryElasticsearchHttp? Which one is the best for my task? What's the difference between GetHTTP and QueryElasticsearchHttp besides a few specific fields? Will GetHTTP work the same if I configure it the way I need it?

Any advice?

I would be grateful for any help.

+3


source to share


1 answer


ElasticsearchHttp processors try to simplify interaction with ES by making an appropriate REST API call based on the properties you set. If you know the complete url you want, you can use GetHttp or InvokeHttp. However, ESHttp processors only allow you to insert what you are looking for, and it will generate a URL and return results.

FetchElasticsearch (and its variants) is used to get a specific document when you know the ID. This is sometimes used after a search / query to return documents one at a time after you know which ones you need.



QueryElasticsearchHttp is when you want to query Lucene style documents when you don't know the documents you want. It will revert to the index.max_result_window value for that index. To get more entries, you can use ScrollElasticsearchHttp . NOTE : QueryElasticsearchHttp expects a request to act as the "q" parameter of the URL. This "mini-language" does not support all fields / operators (see here for more details).

For your use case, you probably need InvokeHttp to return the type of request you are describing. This article describes how to issue a request in the last 15 minutes. After your results are returned, you may need some combination of EvaluateJsonPath and / or SplitJson to work with separate documents, see the Elasticsearch REST API documentation (and NiFi processor documentation) for more details.

+6


source







All Articles