Although relational database systems such as MySQL and Postgres offer high performance and replication, they weren’t built from the ground up for Big Data that’s distributed across multiple systems.
The Hadoop platform, on the other hand, was built expressly for that purpose—but Hadoop isn’t so much a database as a platform built for distributed computing and filesystems. Apache Hive, in turn, is a database system that’s built on top of Hadoop. Hive includes its own variant of SQL that allows you to create tables and store data across a distributed Hadoop filesystem, and query the data. The code for the query can be distributed across servers within a cluster to optimize for performance.
But there’s a caveat here: When we talk about Hive, we’re not necessarily talking high performance and speed within each separate query. Code portions running on separate servers may or may not be faster than a similar query in MySQL or Postgres. Rather, we’re talking about a division of labor, whereby you can have several (or even hundreds or thousands) of nodes in a cluster, each running a portion of the query. With that approach, you can process terabytes of data in a short timeframe, much faster than a similar query on a non-distributed system.
Up to Speed with MapReduce
In order to get the most out of Hive, you need to understand the concept of MapReduce. A concept created by Google, MapReduce is first mentioned in this paper, if you want to take a deep dive.
Here’s a quicker explanation: With MapReduce, you first pull the data you need from a table or joined set of tables—this is called the map process. Then you gather data based on that data—this is called a reduce.
Perhaps the simplest example of a MapReduce is a row count. Suppose you want to know how many rows are in a table. The map portion gathers all the rows in the table (which is a set of data); the reduce portion condenses that to a single number (the count, which is a scalar, or single data item). So mathematically, you’re going from a set of data to a smaller set or even a single number.
Here’s another example: you might want to know how many rows in the table meet a certain criteria. First you take the rows that meet that criteria (the map step), and then you count the results (the reduce step).
(A quick side-note: Don’t read the names ‘Map’ and ‘Reduce’ too literally. You could also do a where clause, and the results of the where clause would still be your mapped data, even though technically you’re only getting a subset (i.e. a reduced amount) of the records in the table.)
By understanding how database lookups work well with the MapReduce concept, you can see how you could divide up labor across multiple servers within a cluster, or at least multiple processes on a single server. But to make this work, you have to understand two more concepts in how your data is organized and retrieved: partitions and clusters.
Partitioning in Hive refers to how your data is physically stored across separate files. For example, you might be gathering large amounts of data daily, and saving it. A natural approach in that case could be to divide each day’s data into a separate partition. This concept works nicely inside Hadoop, and even in cloud storage systems such as Amazon S3.
By partitioning your files, you can allow separate processes within a single MapReduce to read the individual files and operate on them separately in parallel. So how do you configure your table for partitioning? You use the good old SQL statement CREATE TABLE. Hive lets you additionally specify how you want your data partitioned; here’s an example using Hive’s breed of SQL called HiveQL:
CREATE EXTERNAL TABLE IF NOT EXISTS employees ( id INT, name STRING, department INT ) PARTITIONED BY (company STRING) STORED AS ORC LOCATION "/data/employees";
This tells Hive to create a table that will live outside (i.e. externally) from Hive in the local Hadoop file system’s /data/employees directory. The table will be partitioned by a field called company, and will be stored using a special file format created for Hive called ORC. (For more info on ORC, check out this Apache Software Foundation Website.)
Now you can insert data into the table. Here are four inserts; the first three go into one partition, and the fourth goes into a second partition:
INSERT INTO TABLE employees PARTITION(company = 'DICE') values(1, 'George Washington', 1); INSERT INTO TABLE employees PARTITION(company = 'DICE') values(2, 'Thomas Jefferson', 1); INSERT INTO TABLE employees PARTITION(company = 'DICE') values(3, 'Fred Smith', 2); INSERT INTO TABLE employees PARTITION(company = 'APACHE') values(1, 'Abraham Lincoln', 1);
Note that these inserts are rather slow. Hive is designed not so much for a general purpose database where you’re doing lots of reads and writes all day, but more as a system where you would pull large amounts of data from other databases that perform lots of reads and writes, and then start doing massive data analysis inside Hive. In these cases, you would pull data in blocks from a separate source rather than individual insert statements.
After these inserts run, you can see how the data is physically partitioned within the Hadoop File System:
[hadoop@ip-11-0-0-169 ~]$ hadoop fs -ls /data Found 1 items drwxr-xr-x - hadoop hadoop 0 2016-08-17 11:50 /data/employees [hadoop@ip-11-0-0-169 ~]$ hadoop fs -ls /data/employees Found 2 items drwxr-xr-x - hadoop hadoop 0 2016-08-17 11:50 /data/employees/company=APACHE drwxr-xr-x - hadoop hadoop 0 2016-08-17 11:49 /data/employees/company=DICE [hadoop@ip-11-0-0-169 ~]$ hadoop fs -ls /data/employees/company=DICE Found 1 item -rw-r--r-- 1 hadoop hadoop 376 2016-08-17 11:48 /data/employees/company=DICE/000000_0 [hadoop@ip-11-0-0-169 ~]$
There’s a directory for the employees table, and then separate directories under that for each partition, APACHE and DICE.
Suppose you want a count of each row per partition. You can do this using a query that looks just like a traditional SQL statement:
SELECT company, COUNT(*) from employees GROUP BY company;
The main idea behind partitioning is that you can have multiple processes that might operate on individual partitions. So you might have one application that does this:
SELECT count(*) from employees where company = ‘DICE’; While another might use a separate partition: SELECT count(*) from employees where company = ‘APACHE’;
These two queries can run independently and in parallel without any trouble because they use separate partitions.
However, there’s an important thing to note. If you try this out, you’ll see that it takes a very long time just to get a count on a few little rows—far longer than if you were using, for example, MySQL. Why is that? Hive is meant for huge sets of data. While it might take forever for a small number of rows, if you organize your data well, you can process millions or billions of rows much more quickly than otherwise.
But you’re not limited to just reading the data separately per partition. The other aspect to maximizing performance and scalability is how you cluster your data. Clustering works much like ordering your data in a SQL query. The difference is that clustering allows you to retrieve your data in chunks that can be distributed across servers, even if they’re not partitioned separately.
HiveQL allows you to use an ORDER BY clause, as you would with traditional SQL, except it’s not optimal because it doesn’t provide the data in a way that can be easily divided up. Instead, you have two other options that can be combined.
The first is to distribute your data using a field. For example, if you have an employee table that contains a department, you can distribute your data by department using a DISTRIBUTE BY clause. This will split up the resulting data into chunks organized by department. Each chunk can then be processed individually in parallel on a separate process or separate servers, allowing for division of labor.
Additionally, you can sort each chunk on another field using a SORT BY clause. For example, you might DISTRIBUTE BY department, and then SORT BY last name, which will give you a separate chunk of data for each department, with each chunk sorted by last name. These separate chunks can then be pushed out to separate servers for their own processing in parallel.
Because these two steps (distribute and sort) are so useful together, Hive includes a single HiveQL clause that does both. It’s called CLUSTER BY and it takes two fields; first, what you’re distributing by, and then what you’re sorting by.
Reducing Your Data
This now leads to the second part of the MapReduce concept. When you distribute your data, where exactly does it go? It goes to code called a reducer. You can run multiple copies of the reducer code on separate processes or separate servers to allow for higher performance.
Suppose you want to add up the total salary by department, and suppose our table also had a salary field. A traditional SQL statement might look like this:
SELECT sum(salary) FROM employee GROUP BY department where company = ‘DICE’;
To see how this can be optimized, consider that the data within the departments are independent. Summing one department can happen without needing a result from a separate department. (If a process doesn’t need data from a separate process, you can divide up the code to run in parallel.) If you divide up your data by department, you can push this clustered data onto separate servers or processes and run the summing in parallel. Each summation is called a reducer, and Hive lets you run multiple reducers in parallel. You can do that easily enough by using the aforementioned DISTRIBUTE BY:
SELECT sum(salary) FROM employee DISTRIBUTE BY department
Notice the only real difference between this and the traditional SQL statement is that we’re using DISTRIBUTE BY instead of relying on a GROUP BY clause.
That’s it! Now when you run this query, Hive will divide the processing up among reducers. How many processes and servers it uses depends on what you’ve allocated, and what’s available.
This is just scratching the surface of Hive. One cool aspect of Hive is that you can write your own reducer scripts in other languages such as Python, and have the results of the distributed data pushed into your scripts as an incoming text stream. You can then process that data any way you like with your Python code. Hive will launch separate reducers for each set obtained by a DISTRIBUTE BY clause, and it will come in sorted as specified in a SORT BY clause. Then you write the results back to standard output, which will send the data back to Hive. Check out this and more features in the official documentation.