847-505-9933 | +91 20 66446300 info@datametica.com

Introduction to Hive transactions

Updates, transactions, and indexes are mainstays of traditional databases. Yet, until recently, these features had not been considered a part of Hive’s feature set. This is because Hive was built to operate over HDFS data using MapReduce, where full-table scans are the norm and a table update is achieved by transforming the data into a new table. For a data warehousing application that runs over large portions of the dataset, this works well.
However, there are scenarios wherein insert appends & updates are needed. On the transactions front, Hive doesn’t define clear semantics for concurrent access to tables, which means applications need to build their own application-level concurrency or locking mechanism.
Up until Hive 0.13, atomicity, consistency, and durability were provided at the partition level. Isolation could be provided by turning on one of the available locking mechanisms (Zookeeper or in memory). With the addition of transactions in Hive 0.13 it is now possible to provide full ACID semantics at the row level in Hive 0.14, so that one application can add rows while another reads from the same partition without interfering with each other.

Updates & Insert Appends in Older versions of Hive (before 0.14)

To achieve updates & insert appends in older versions of hive(versions before 0.14), the following incremental approach can be used.
This incremental data feed into hive has several advantages :
• Since external tables are used, refreshing data becomes easy. Data can be refreshed simply by adding files to the hdfs directory.
• The steps that are followed in this approach can be coordinated in the OOZIE workflow wherein it can become a scheduled event.

To understand the incremental approach, let us take an example of sales data.

Step 1 : Create a base table that has sales data.

(
sell_id BIGINT,
payment_type_id STRING,
total_amt DOUBLE,
store_id STRING,
timestamp ST RING
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’
LOCATION ‘/user/ldap/hivetest/base_sales’
TBLPROPERTIES (‘primary_key_columns’ = ‘sell_id’);

Step 2 : Create a stage table that has new data. It might have updates or new insert data.

CREATE EXTERNAL TABLE stg_sales
(
sell_id BIGINT,
payment_type_id STRING,
total_amt DOUBLE,
store_id STRING,
timestamp STRING
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’
LOCATION ‘/user/ldap/hivetest/stg_sales’
TBLPROPERTIES (‘primary_key_columns’ = ‘sell_id’);

Step 3 : Create a view that will hold the records both from the base table as well as the stage table. This view will give the most recent records, i.e. the updates will be reflected in the output & entry of new records will also be visible.

CREATE VIEW update_view AS
SELECT t1.* FROM
(SELECT * FROM base_sales
UNION ALL
SELECT * FROM stg_sales) t1
JOIN
(SELECT sell_id, max(timestamp) max_timestamp FROM
(SELECT * FROM base_sales
UNION ALL
SELECT * FROM stg_sales) t2
GROUP BY sell_id) s
ON t1.sell_id = s.sell_id AND t1.timestamp = s.max_timestamp;

Step 4 : The view created in Step 3 will have all the recent records. The current step will ensure we get all the recent records from the view to the base table.

INSERT OVERWRITE TABLE base_sales SELECT * FROM update_view;

Alternative way to perform transactions in older versions of hive

The above approach picks all the existing data & rewrites them back to the table. So this approach is not scale-able.
To overcome this issue, we can add an extra column called version_id in the sales_stage table. Also, table properties such as primary key columns and the version id column can also be added. The primary key column will be used in identifying a record from the existing data.
Following this approach will leave the sale table untouched as we are writing new data to new partition.
So, the updated code will look like this :

Step 1: The sales stage table will look like this.

CREATE EXTERNAL TABLE sales_stage
(
sell_id BIGINT,
acct_id BIGINT,
payment_type_id STRING,
status_id STRING,
total_amt DOUBLE,
batch INT,
store_id STRING,
create_date STRING
)
PARTITIONED BY (store_id STRING, create_date STRING , version_id INT)
LOCATION ‘/user/ldap/hivetest/data/sales_stage’
TBLPROPERTIES (‘op_code_column’ = ‘op_code’, ‘version_id_column’ = ‘version_id’,
‘primary_key_columns’ = ‘sell_id’);

Step 2 :The sales table will look like this.

CREATE EXTERNAL TABLE sales
(
sell_id BIGINT,
acct_id BIGINT,
payment_type_id STRING,
status_id STRING,
total_amt DOUBLE ,
batch INT,
)
PARTITIONED BY (store_id STRING, create_date STRING)
ROW FORMAT SERDE ‘org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe’
‘org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe’
STORED AS RCFILE
LOCATION ‘/user/ldap/hivetest/data/sales';

Step 3 : To get the updates, the query will look something like this.

INSERT OVERWRITE TABLE sell PARTITION(store_id, create_date, version_id) SELECT * FROM sales_stage;

Transactions in Hive 0.14

Hive 0.14 supports UPDATE & DELETE operations on table. To achieve transactions in Hive 0.14, following configuration needs to be done.

Configuration

These configuration parameters must be set appropriately to turn on transaction support in Hive 0.14:

• hive.support.concurrency – true
• hive.enforce.bucketing – true
• hive.exec.dynamic.partition.mode – nonstrict
• hive.txn.manager – org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
• hive.compactor.initiator.on – true (for exactly one instance of the Thrift metastore service)
• hive.compactor.worker.threads – a positive number on at least one instance of the Thrift metastore service

The above configuration can also be set in hive-site.xml which is present in the conf directory where hive is extracted.
Here is more details about above parameters :

IMG9

Table Properties

•If a table is to be used in ACID writes (insert, update, delete) then the table property “transactional” must be set on that table, starting with Hive 0.14.0. Without this value, inserts will be done in the old style; updates and deletes will be prohibited. However, this does not apply to Hive 0.13.0.
•Table properties are set with the TBLPROPERTIES clause when a table is created or altered, as described in the Create Table and Alter Table Properties sections of Hive Data Definition Language. Currently the “transactional” and “NO_AUTO_COMPACTION” table properties are case-sensitive, although that will change in a future release with HIVE-8308.

Sample Code

SET hive.support.concurrency=true;
SET hive.enforce.bucketing=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
SET hive.compactor.initiator.on=true;
SET hive.compactor.worker.threads=1;
SET hive.support.concurrency=true;
SET hive.enforce.bucketing=true;

create table test(id int, name string) clustered by (id) into 2 buckets
stored as orc TBLPROPERTIES (‘transactional’=’true’);
insert into table test values(1,’Jim’);
select * from test;
update test set id=2 where name=’Jim';
select * from test;

How do transactions work internally in Hive 0.14?

Hive stores data in base files that cannot be updated by HDFS. Instead, Hive creates a set of delta files for each transaction that alters a table or partition and stores them in a separate delta directory. Occasionally, Hive compacts, or merges, the base and delta files. Hive performs all compactions in the background without affecting concurrent reads and writes of Hive clients. There are two types of compactions:
Minor : Rewrites a set of delta files to a single delta file for a bucket.
Major : Rewrites one or more delta files and the base file as a new base file for a bucket.
By default, Hive automatically compacts delta and base files at regular intervals. However, Hadoop administrators can configure automatic compactions, as well as perform manual compactions of base and delta files using the following configuration parameters in hive-site.xml.

Limitations of transact ions in Hive 0.14

•BEGIN, COMMIT, and ROLLBACK are not yet supported. All language operations are auto-commit. The plan is to support these in a future release.
•Only ORC file format is supported in this first release. The feature has been built such that transactions can be used by any storage format that can determine how updates or deletes apply to base records (basically, that has an explicit or implicit row id), but so far the integration work has only been done for ORC.
•Tables must be bucketed to make use of these features. Tables in the same system not using transactions and ACID do not need to be bucketed.

Blog By:-

Gargi Chatterji

Leave a Comment

POST COMMENT Back to Top
*
Contact Us

We're not around right now. But you can send us an email and we'll get back to you, asap.