Elastic{ON}16 – Takeaways – Part 3

Probably one of the biggest takeaways I gained at Elastic{ON} would have to be what I learned in regards to the integration with Hadoop, Hive and Spark.

Currently I do some work with integrating Hive to ElasticSearch.  I had noticed that it seemed like Hive would retrieve all the records first before filtering the result set based on the query. I also noticed that queries I would consider simple, such as just running a count, engaged the full stack.

0: jdbc:hive2://localhost:10000> SELECT COUNT(*) FROM testElasticSearch;
INFO : Number of reduce tasks determined at compile time: 1
INFO : In order to change the average load for a reducer (in bytes):
INFO : set hive.exec.reducers.bytes.per.reducer=<number>
INFO : In order to limit the maximum number of reducers:
INFO : set hive.exec.reducers.max=<number>
INFO : In order to set a constant number of reducers:
INFO : set mapreduce.job.reduces=<number>
INFO : number of splits:5
INFO : Submitting tokens for job: job_local503289697_0029
INFO : The url to track the job: http://localhost:8080/
INFO : Job running in-process (local Hadoop)
INFO : 2016-03-29 23:52:44,846 Stage-1 map = 0%, reduce = 0%
INFO : 2016-03-29 23:52:50,854 Stage-1 map = 5%, reduce = 0%
INFO : 2016-03-29 23:52:53,859 Stage-1 map = 7%, reduce = 0%
INFO : 2016-03-29 23:52:56,871 Stage-1 map = 9%, reduce = 0%
INFO : 2016-03-29 23:52:58,875 Stage-1 map = 12%, reduce = 0%
INFO : 2016-03-29 23:53:01,882 Stage-1 map = 100%, reduce = 0%
INFO : 2016-03-29 23:53:07,896 Stage-1 map = 25%, reduce = 0%
INFO : 2016-03-29 23:53:10,902 Stage-1 map = 28%, reduce = 0%
INFO : 2016-03-29 23:53:13,909 Stage-1 map = 30%, reduce = 0%
INFO : 2016-03-29 23:53:17,209 Stage-1 map = 100%, reduce = 0%
INFO : 2016-03-29 23:53:23,221 Stage-1 map = 45%, reduce = 0%
INFO : 2016-03-29 23:53:26,229 Stage-1 map = 48%, reduce = 0%
INFO : 2016-03-29 23:53:29,236 Stage-1 map = 50%, reduce = 0%
INFO : 2016-03-29 23:53:32,243 Stage-1 map = 100%, reduce = 0%
INFO : 2016-03-29 23:53:38,256 Stage-1 map = 65%, reduce = 0%
INFO : 2016-03-29 23:53:41,264 Stage-1 map = 68%, reduce = 0%
INFO : 2016-03-29 23:53:44,271 Stage-1 map = 71%, reduce = 0%
INFO : 2016-03-29 23:53:47,278 Stage-1 map = 73%, reduce = 0%
INFO : 2016-03-29 23:53:48,281 Stage-1 map = 100%, reduce = 0%
INFO : 2016-03-29 23:53:54,298 Stage-1 map = 85%, reduce = 0%
INFO : 2016-03-29 23:53:57,311 Stage-1 map = 88%, reduce = 0%
INFO : 2016-03-29 23:54:00,320 Stage-1 map = 91%, reduce = 0%
INFO : 2016-03-29 23:54:03,328 Stage-1 map = 100%, reduce = 100%
INFO : Ended Job = job_local503289697_0029
+---------+--+
| _c0 |
+---------+--+
| 371070 |
+---------+--+
1 row selected (80.921 seconds)

 

What happened to _count? I assumed that the integration would be able to examine the query before Hive passed it on, allowing it to manipulate the query text to be more efficient. At the conference, I learned that due to Hive, this simply isn’t possible yet. Costin referred to this operation as “Push-Down”.   Currently Spark is the only on that allows for this Push-Down operation.Pushdown

Costin addressed another challenge I’ve been facing which what I like to refer to as the field identity crisis. In the mapping, it does not explicitly call out fields that are arrays. This can cause the queries to fail when that field goes from a string to an array of strings.

Take this sample mapping:

{
   "testdata": {
      "mappings": {
         "stringarray": {
            "properties": {
               "userid": {
                  "properties": {
                     "id": {
                        "type": "long"
                     }
                  }
               }
            }
         }
      }
   }
}

 

In Hive, we would define the table as something like this:

CREATE EXTERNAL TABLE testdata (userid STRUCT<id:bigint>)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.resource' = 'testdata/stringarray',
'es.nodes' = 'xxxx',
'es.port' = '9200');

 

Now when we query the table, notice the error returned:

SELECT * FROM testdata;

Error: java.io.IOException: java.lang.ClassCastException (state=,code=0)

 

We receive this vague error due to the id field sometimes actually being an array of structs.

 "userid": [
                  {
                     "id": 1
                  },
                  {
                     "id": 2
                  }
           ]

 

This specific document would need to be handled with the below table definition.

CREATE EXTERNAL TABLE testdata (userid ARRAY&<STRUCT<id:bigint>>)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.resource' = 'testdata/stringarray',
'es.nodes' = 'xxxx',
'es.port' = '9200');

 

This is a challenge for Hive. We need the data to stay consistent to its type or we will not be able to successfully retrieve all the records from the index with Hive.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s