#onenote# hive

 

The Metastore

The metastore is the central repository of Hive metadata. The metastore is divided into two pieces: a service and the backing store for the data. By default, the metastore service runs in the same JVM as the Hive service and contains an embedded Derby database instance backed by the local disk. This is called the embedded metastoreconfiguration

 

Execution engines

Hive was originally written to use MapReduce as its execution engine, and that is still the default. It is now also possible to run Hive using Apache Tez as its execution engine, and work is underway to support Spark  too. Both Tez and Spark are general directed acyclic graph (DAG) engines that offer more flexibility and higher performance than MapReduce

hive> SET hive.execution.engine=tez;

 

Hive, on the other hand, doesn’t verify the data when it is loaded, but rather when a query is issued. This is called schema on read.

In a traditional database –  schema on write

HDFS does not provide in-place file updates, so changes resulting from inserts, updates, and deletes are stored in small delta files. Delta files are periodically merged into the base table files by MapReduce jobs that are run in the background by the metastore.

Hive also has support for table- and partition-level locking.Locks are managed transparently using ZooKeeper,

Hive indexes can speed up queries in certain cases, There are currently two index types: compact and bitmap

Compact indexes store the HDFS block numbers of each value, rather than each file offset, so they don’t take up much disk space but are still effective for the case where values are clustered together in nearby rows. Bitmap indexes use compressed bitsets to efficiently store the rows that a particular value appears in, and they are usually appropriate for low-cardinality columns (such as gender or country).

Table 17-2. A high-level comparison of SQL and HiveQL

Feature SQL HiveQL References
Updates UPDATE, INSERT, DELETE UPDATE, INSERT, DELETE InsertsUpdates, Transactions, and Indexes
Transactions Supported Limited support  
Indexes Supported Supported  
Data types Integral, floating-point, fixed-point, text and binary strings, temporal Boolean, integral, floating-point, fixed-point, text and binary strings, temporal, array, map, struct Data Types
Functions Hundreds of built-in functions Hundreds of built-in functions Operators and Functions
Multitable inserts Not supported Supported Multitable insert
CREATE TABLE…AS SELECT Not valid SQL-92, but found in some databases Supported CREATE TABLE…AS SELECT
SELECT SQL-92 SQL-92. SORT BY for partial ordering, LIMIT to limit number of rows returned Querying Data
Joins SQL-92, or variants (join tables in the FROM clause, join condition in the WHERE clause) Inner joins, outer joins, semi joins, map joins, cross joins Joins
Subqueries In any clause (correlated or noncorrelated) In the FROM, WHERE, or HAVING clauses (uncorrelated subqueries not supported) Subqueries
Views Updatable (materialized or nonmaterialized) Read-only (materialized views not supported) Views
Extension points User-defined functions, stored procedures User-defined functions, MapReduce scripts User-Defined Functions;MapReduce Scripts

 

Table 17-3. Hive data types

Category Type Description Literal examples
Primitive BOOLEAN True/false value. TRUE
  TINYINT 1-byte (8-bit) signed integer, from –128 to 127. 1Y
  SMALLINT 2-byte (16-bit) signed integer, from –32,768 to 32,767. 1S
  INT 4-byte (32-bit) signed integer, from –2,147,483,648 to 2,147,483,647. 1
  BIGINT 8-byte (64-bit) signed integer, from –9,223,372,036,854,775,808 to 9,223,372,036,854,775,807. 1L
  FLOAT 4-byte (32-bit) single-precision floating-point number. 1.0
  DOUBLE 8-byte (64-bit) double-precision floating-point number. 1.0
  DECIMAL Arbitrary-precision signed decimal number. 1.0
  STRING Unbounded variable-length character string. ‘a’, “a”
  VARCHAR Variable-length character string. ‘a’, “a”
  CHAR Fixed-length character string. ‘a’, “a”
  BINARY Byte array. Not supported
  TIMESTAMP Timestamp with nanosecond precision. 1325502245000, ‘2012-01-02 03:04:05.123456789’
  DATE Date. ‘2012-01-02’
Complex ARRAY An ordered collection of fields. The fields must all be of the same type. array(1, 2) [a]
  MAP An unordered collection of key-value pairs. Keys must be primitives; values may be any type. For a particular map, the keys must be the same type, and the values must be the same type. map(‘a’, 1, ‘b’, 2)
  STRUCT A collection of named fields. The fields may be of different types. struct(‘a’, 1, 1.0),[b] named_struct(‘col1’, ‘a’, ‘col2’, 1, ‘col3’, 1.0)
  UNION A value that may be one of a number of defined data types. The value is tagged with an integer (zero-indexed) representing its data type in the union. create_union(1, ‘a’, 63)
[a] The literal forms for arrays, maps, structs, and unions are provided as functions. That is, array, map, struct, and create_union are built-in Hive functions.

[b] The columns are named col1, col2, col3, etc.

     

 

 

Managed Tables

  • By default Hive will manage the data, which means that Hive moves the data into its warehouse directory
  • DROP TABLE managed_table; <–the table, including its metadata and its data, is deleted.

CREATE TABLE managed_table (dummy STRING);

LOAD DATA INPATH ‘/user/tom/data.txt’ INTO table managed_table;

 

External Tables

  • Hive to refer to the data that is at an existing location outside the warehouse directory
  • When you drop an external table, Hive will leave the data untouched and only delete the metadata.
  •  can create the data lazily after creating the table

CREATE EXTERNAL TABLE external_table (dummy STRING)

LOCATION ‘/user/tom/external_table’;

View

In Hive, a view is not materialized to disk when it is created; rather, the view’s SELECT statement is executed when the statement that refers to the view is run. If a view performs extensive transformations on the base tables or is used frequently, you may choose to manually materialize it by creating a new table that stores the contents of the view (see CREATE TABLE…AS SELECT).

 

Pig vs Hive

Apache Pig is a platform for analyzing large data sets that consists of a high-level language for expressing data analysis programs, coupled with infrastructure for evaluating these programs. The salient property of Pig programs is that their structure is amenable to substantial parallelization, which in turns enables them to handle very large data sets.

At the present time, Pig’s infrastructure layer consists of a compiler that produces sequences of Map-Reduce programs, for which large-scale parallel implementations already exist (e.g., the Hadoop subproject). Pig’s language layer currently consists of a textual language called Pig Latin, which has the following key properties:

  • Ease of programming. It is trivial to achieve parallel execution of simple, “embarrassingly parallel” data analysis tasks. Complex tasks comprised of multiple interrelated data transformations are explicitly encoded as data flow sequences, making them easy to write, understand, and maintain.
  • Optimization opportunities. The way in which tasks are encoded permits the system to optimize their execution automatically, allowing the user to focus on semantics rather than efficiency.
  • Extensibility. Users can create their own functions to do special-purpose processing.

 

The Apache Hive ™ data warehouse software facilitates querying and managing large datasets residing in distributed storage. Hive provides a mechanism to project structure onto this data and query the data using a SQL-like language called HiveQL. At the same time this language also allows traditional map/reduce programmers to plug in their custom mappers and reducers when it is inconvenient or inefficient to express this logic in HiveQL.

 

http://stackoverflow.com/questions/3356259/difference-between-pig-and-hive-why-have-both

Pig: a dataflow language and environment for exploring very large datasets.

Hive: a distributed data warehouse

You can achieve similar results with pig/hive queries. The main difference lies within approach to understanding/writing/creating queries.

Pig tends to create a flow of data: small steps where in each you do some processing

Hive gives you SQL-like language to operate on your data, so transformation from RDBMS is much easier (Pig can be easier for someone who had not earlier experience with SQL)

It is also worth noting, that for Hive you can nice interface to work with this data (Beeswax for HUE, or Hive web interface), and it also gives you metastore for information about your data (schema, etc) which is useful as a central information about your data.

 

Schema Design

Bucketing Table Data Storage

Partitions offer a convenient way to segregate data and to optimize queries. However, not all data sets lead to sensible partitioning, especially given the concerns raised earlier about appropriate sizing.

 

Bucketing is another technique for decomposing data sets into more manageable parts.

 

By default Hive limits the maximum number of dynamic partitions that may be created to prevent the extreme case

 

Instead, if we bucket the weblog table and use user_id as the bucketing column, the value of this column will be hashed by a user-defined number into buckets. Records with the same user_id will always be stored in the same bucket. Assuming the number  of users is much greater than the number of buckets, each bucket will have many users:

hive> CREATE TABLE weblog (user_id INT, url STRING, source_ip STRING)

> PARTITIONED BY (dt STRING)

> CLUSTERED BY (user_id) INTO 96 BUCKETS;

 

 

 

Table-by-Day

Table-by-day is a pattern where a table named supply is appended with a timestamp such as supply_2011_01_01, supply_2011_01_02, etc.

 

 

With Hive, a partitioned table should be used instead. Hive uses expressions in the WHERE clause to select input only from the partitions needed for the query. This query will run efficiently, and it is clean and easy on the eyes:

 

hive> CREATE TABLE supply (id int, part string, quantity int)

> PARTITIONED BY (int day);

hive> ALTER TABLE supply add PARTITION (day=20110102);

 

 

Over Partitioning

 

HDFS was designed for many millions of large files, not billions of small files. The first drawback of having too many partitions is the large number of Hadoop files and directories

that are created unnecessarily.

 

MapReduce processing converts a job into multiple tasks. In the default case, each task is a new JVM instance, requiring the overhead of start up and tear down

 

 

An ideal partition scheme should not result in too many partitions and their

directories, and the files in each directory should be large, some multiple of the filesystem  block size.

 

A good strategy for time-range partitioning, for example, is to determine the approximate size of your data accumulation over different granularities of time, and start with the granularity that results in “modest” growth in the number of partitions over time, while each partition contains files at least on the order of the filesystem block size or

multiples thereof. This balancing keeps the partitions large, which optimizes throughput for the general case query.

 

 

Another solution is to use two levels of partitions along different dimensions. For example, the first partition might be by day and the second-level partition might be by geographic region, like the state:

hive> CREATE TABLE weblogs (url string, time long, city string )

> PARTITIONED BY (day int, state string);

hive> SELECT * FROM weblogs WHERE day=20110102;

 

Using Columnar Tables

??

 

(Almost) Always Use Compression!

In almost all cases, compression makes data smaller on disk, which usually makes queries faster by reducing I/O overhead. Hive works seamlessly with many compression types. The only compelling reason to not use compression is when the data produced  is intended for use by an external system, and an uncompressed format, such as text,is the most compatible.

 

Hive  & HBASE Integration

 

Consider that you work with RDBMS and have to select what to use – full table scans, or index access – but only one of them.

If you select full table scan – use hive. If index access – HBase

 

HBase is like a Map. If u know the key, u can instantly get the value. But if u want to know how many integer keys in Hbase are between 1000000 and 2000000 that is not suitable for Hbase alone. If you have data to be aggregated, rolled up, analyzed across rows then consider Hive

https://cwiki.apache.org/confluence/display/Hive/HBaseIntegration

In order to create a new HBase table which is to be managed by Hive, use the STORED BY clause on CREATE TABLE:

CREATE TABLE hbase_table_1(key int, value string)
STORED BY ‘org.apache.hadoop.hive.hbase.HBaseStorageHandler’
WITH SERDEPROPERTIES (“hbase.columns.mapping” = “:key,cf1:val”)
TBLPROPERTIES (“hbase.table.name” = “xyz”, “hbase.mapred.output.outputtable” = “xyz”);

 

 

If you want to give Hive access to an existing HBase table, use CREATE EXTERNAL TABLE:
CREATE EXTERNAL TABLE hbase_table_2(key int, value string)
STORED BY ‘org.apache.hadoop.hive.hbase.HBaseStorageHandler’
WITH SERDEPROPERTIES (“hbase.columns.mapping” = “cf1:val”)
TBLPROPERTIES(“hbase.table.name” = “some_existing_table”, “hbase.mapred.output.outputtable” = “some_existing_table”);

 

 

e.g.

 

— forest_hive_hbase_table  <– table in hbase

— hive_hbase_table < — table seen in hive

— this sql is execute in hive

CREATE TABLE hive_hbase_table

(rowkey string,jsonText string)

STORED BY ‘org.apache.hadoop.hive.hbase.HBaseStorageHandler’

WITH SERDEPROPERTIES (“hbase.columns.mapping” = “:key,cf:jsonText”,’hbase.table.default.storage.type’ = ‘binary’,’hbase.columnfamily.cf.compression’ = ‘SNAPPY’)

TBLPROPERTIES (“hbase.table.name” = “forest_hive_hbase_table”);

 

Pasted from <https://cwiki.apache.org/confluence/display/Hive/HBaseIntegration>

 

 

Command

      1. Show database with like

show databases like ‘*20160131’;

or

show schemas like ‘*20160131’;

 

      1. Execute a command without a CLI

hive -e “select * from camp_eg_uat_20160318.rtl_viol_rollup_batch_log”

      1. Run command in a file

hive -f test.hql

      1. Functions

Hive> show functions;

Hive> desc function substr;

 

      1. hive> SHOW PARTITIONS logs;   <– show table logs partitions
        dt=2001-01-01/country=GB
        dt=2001-01-01/country=US
        dt=2001-01-02/country=GB
        dt=2001-01-02/country=US
      2. PARTITIONS
        1. Table ddl
        2. CREATE TABLE logs (ts BIGINT, line STRING)
          PARTITIONED BY (dt STRING, country STRING);

 

      1. Load data

LOAD DATA LOCAL INPATH ‘input/hive/partitions/file1′
INTO TABLE logs
PARTITION (dt=’2001-01-01′, country=’GB’);

 

      1. The directory structure might look like this:

/user/hive/warehouse/logs
├── dt=2001-01-01/
│   ├── country=GB/
│   │   ├── file1
│   │   └── file2
│   └── country=US/
│       └── file3
└── dt=2001-01-02/
├── country=GB/
│   └── file4
└── country=US/
├── file5
└── file6

 

 

      1. Buckets

CREATE TABLE bucketed_users (id INT, name STRING)

CLUSTERED BY (id) SORTED BY (id ASC) INTO 4 BUCKETS;

 

hive> dfs -ls /user/hive/warehouse/bucketed_users;

shows that four files were created, with the following names (the names are generated by Hive):

000000_0

000001_0

000002_0

000003_0

 

 

      1. Describe Formatted/Extended will show the data definition of the table in hive

hive> Describe Formatted dbname.tablename;

 

 

      1. Show ddl

show create table wgdc_ukg9_uip_users;

 

      1. delete all the tables in a linux environment.

hive -e ‘show tables’ | xargs -I ‘{}’ hive -e ‘drop table {}’

 

      1. Explain

The explain output has three parts:

The Abstract Syntax Tree for the query

The dependencies between the different stages of the plan

The description of each of the stages

explain  select * from pageviews;

 

      1.  Enabling Tez for Hive Queries

set hive.execution.engine=tez;

      1. Enable CBO

A CBO generates more efficient query plans. In Hive, the CBO is enabled by default, but it requires that column statistics be generated for tables. Column statistics can be expensive to compute so they are not automated. Hive has a CBO that is based on Apache Calcite and an older physical optimizer. All of the optimizations are being migrated to the to the CBO. The physical optimizer performs better with statistics, but the CBO requires statistics.

 

set hive.compute.query.using.stats=true;

set hive.stats.fetch.column.stats = true;

set hive.stats.fetch.partition.stats = true;

set hive.cbo.enable = true;

 

 

  • Start metastore

 

hive –service metastore

 

DDL  SQL

      1. EXTERNAL TABLE

CREATE EXTERNAL TABLE page_view(viewTime INT, userid BIGINT,

page_url STRING, referrer_url STRING,

ip STRING COMMENT ‘IP Address of the User’,

country STRING COMMENT ‘country of origination’)

COMMENT ‘This is the staging page view table’

ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘\054’

STORED AS TEXTFILE

LOCATION ‘<hdfs_location>’;

 

      1. Partitioned  table

create table table_name (

id                int,

dtDontQuery       string,

name              string

)

partitioned by (date string)  <— will

 

 

CREATE TABLE students (name VARCHAR(64), age INT, gpa DECIMAL(3, 2))

CLUSTERED BY (age) INTO 2 BUCKETS STORED AS ORC;

 

INSERT INTO TABLE students

VALUES (‘fred flintstone’, 35, 1.28), (‘barney rubble’, 32, 2.32);

 

  1. BUCKET

CREATE TABLE pageviews (userid VARCHAR(64), link STRING, came_from STRING)

PARTITIONED BY (datestamp STRING) CLUSTERED BY (userid) INTO 256 BUCKETS STORED AS ORC;

 

INSERT INTO TABLE pageviews PARTITION (datestamp = ‘2014-09-23’)

VALUES (‘jsmith’, ‘mail.com’, ‘sports.com’), (‘jdoe’, ‘mail.com’, null);

 

INSERT INTO TABLE pageviews PARTITION (datestamp)

VALUES (‘tjohnson’, ‘sports.com’, ‘finance.com’, ‘2014-09-23’), (‘tlee’, ‘finance.com’, null, ‘2014-09-21’);

—————————-

CREATE TABLE page_view(viewTime INT, userid BIGINT,

page_url STRING, referrer_url STRING,

ip STRING COMMENT ‘IP Address of the User’)

COMMENT ‘This is the page view table’

PARTITIONED BY(dt STRING, country STRING)

CLUSTERED BY(userid) SORTED BY(viewTime) INTO 32 BUCKETS

ROW FORMAT DELIMITED

FIELDS TERMINATED BY ‘\001’

COLLECTION ITEMS TERMINATED BY ‘\002’

MAP KEYS TERMINATED BY ‘\003’

STORED AS SEQUENCEFILE;

 

the page_view table is bucketed (clustered by) userid and within each bucket the data is sorted in increasing order of viewTime. Such an organization allows the user to do efficient sampling on the clustered column – in this case userid. The sorting property allows internal operators to take advantage of the better-known data structure while evaluating queries, also increasing efficiency. MAP KEYS and COLLECTION ITEMS keywords can be used if any of the columns are lists or maps

 

The command set hive.enforce.bucketing = true; allows the correct number of reducers and the cluster by column to be automatically selected based on the table. Otherwise, you would need to set the number of reducers to be the same as the number of buckets as in set mapred.reduce.tasks = 256; and have a CLUSTER BY … clause in the select.

 

 

 

 

      1. Create table by select query

CREATE TABLE new_key_value_store

ROW FORMAT SERDE “org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe”

STORED AS RCFile

AS

SELECT (key % 1024) new_key, concat(key, value) key_value_pair

FROM key_value_store

SORT BY new_key, key_value_pair;

      1. Create table like (the same as source table)

CREATE TABLE empty_key_value_store LIKE key_value_store;

 

 

 

  • Inserting data into Hive Tables from queries

 

 

INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 …) [IF NOT EXISTS]] select_statement1 FROM from_statement;

INSERT OVERWRITE will overwrite any existing data in the table or partition

 

INSERT INTO TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 …)] select_statement1 FROM from_statement;

INSERT INTO will append to the table or partition, keeping the existing data intact.

 

      1. Drop PARTITION

ALTER TABLE logs DROP IF EXISTS PARTITION(year = 2012, month = 12, day = 18);

ALTER TABLE pageviews DROP IF EXISTS PARTITION(datestamp = ‘2014-09-21’);

 

      1. Truncate partition

TRUNCATE TABLE pageviews PARTITION (datestamp = ‘2014-09-21’);

 

      1. Loading files into tables

 

LOAD DATA [LOCAL] INPATH ‘filepath’ [OVERWRITE] INTO TABLE tablename [PARTITION (partcol1=val1, partcol2=val2 …)]

filepath can be:

      • a relative path, such as project/data1
      • an absolute path, such as /user/hive/project/data1
      • a full URI with scheme and (optionally) an authority, such as hdfs://namenode:9000/user/hive/project/data1

 

If the keyword LOCAL is specified, then:

the load command will look for filepath in the local file system. If a relative path is specified, it will be interpreted relative to the user’s current working directory. The user can specify a full URI for local files as well – for example: file:///user/hive/project/data1

 

If the keyword LOCAL is not specified, then Hive will either use the full URI of filepath.

 

Eg.

LOAD DATA local INPATH ‘/hsbc/babar/sit/forest/hive/SensorFiles/HVAC.csv’ OVERWRITE INTO TABLE hvac;

LOAD DATA local INPATH ‘/hsbc/babar/sit/forest/hive/SensorFiles/building.csv’ OVERWRITE INTO TABLE building;

 

 

      1. CompressedStorage

You can import text files compressed with Gzip or Bzip2 directly into a table stored as TextFile. The compression will be detected automatically and the file will be decompressed on-the-fly during query execution. For example:

 

CREATE TABLE raw (line STRING)

ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘\t’ LINES TERMINATED BY ‘\n’;

 

LOAD DATA LOCAL INPATH ‘/tmp/weblogs/20090603-access.log.gz’ INTO TABLE raw;

 

The recommended practice is to insert data into another table, which is stored as a SequenceFile. A SequenceFile can be split by Hadoop and distributed across map jobs whereas a GZIP file cannot be. For example:

CREATE TABLE raw (line STRING)

ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘\t’ LINES TERMINATED BY ‘\n’;

 

CREATE TABLE raw_sequence (line STRING)

STORED AS SEQUENCEFILE;

 

LOAD DATA LOCAL INPATH ‘/tmp/weblogs/20090603-access.log.gz’ INTO TABLE raw;

 

SET hive.exec.compress.output=true;

SET io.seqfile.compression.type=BLOCK; — NONE/RECORD/BLOCK (see below)

INSERT OVERWRITE TABLE raw_sequence SELECT * FROM raw

 

      1. Index

 

CREATE INDEX hvac_compact_idx ON TABLE hvac (recorddate) AS ‘COMPACT’ WITH DEFERRED REBUILD;

ALTER INDEX hvac_compact_idx ON hvac REBUILD;

SHOW FORMATTED INDEX ON hvac;

DROP INDEX hvac_compact_idx ON hvac;

 

— bitmap index ->  for the column with few distinct  value

CREATE INDEX hvac_bigmap_idx ON TABLE hvac (system) AS ‘BITMAP’ WITH DEFERRED REBUILD;

ALTER INDEX hvac_bigmap_idx ON hvac REBUILD;

SHOW FORMATTED INDEX ON hvac;

DROP INDEX hvac_bigmap_idx ON hvac;

 

 

 

Performance

      1. Hive cluster by vs order by vs sort by

 

The longer version:

 

ORDER BY x:

guarantees global ordering, but does this by pushing all data through just one reducer. This is basically unacceptable for large datasets. You end up one sorted file as output.

 

SORT BY x:

orders data at each of N reducers, but each reducer can receive overlapping ranges of data. You end up with N or more sorted files with overlapping ranges.

 

DISTRIBUTE BY x:

ensures each of N reducers gets non-overlapping ranges of x, but doesn’t sort the output of each reducer. You end up with N or unsorted files with non-overlapping ranges.

 

CLUSTER BY x:

ensures each of N reducers gets non-overlapping ranges, then sorts by those ranges at the reducers. This gives you global ordering, and is the same as doing (DISTRIBUTE BY x and SORT BY x). You end up with N or more sorted files with non-overlapping ranges.

 

Make sense? So CLUSTER BY is basically the more scalable version of ORDER BY.

 

 

 

  • 数据倾斜

 

所谓数据倾斜,说的是由于数据分布不均匀,个别值集中占据大部分数据量,加上Hadoop的计算模式,导致计算资源不均匀引起性能下降。下图就是一个例子:

 

还是拿网站的访问日志说事吧。假设网站访问日志中会记录用户的user_id,并且对于注册用户使用其用户表的user_id,对于非注册用户使用一个user_id=0代表。那么鉴于大多数用户是非注册用户(只看不写),所以user_id=0占据了绝大多数。而如果进行计算的时候如果以user_id作为group by的维度或者是join key,那么个别Reduce会收到比其他Reduce多得多的数据——因为它要接收所有user_id=0的记录进行处理,使得其处理效果会非常差,其他Reduce都跑完很久了它还在运行。

倾斜分成group by造成的倾斜和join造成的倾斜,需要分开看。

group by造成的倾斜有两个参数可以解决,一个是Hive.Map.aggr,默认值已经为true,意思是会做Map端的combiner。所以如果你的group by查询只是做count(*)的话,其实是看不出倾斜效果的,但是如果你做的是count(distinct),那么还是会看出一点倾斜效果。另一个参数是Hive.groupby. skewindata。这个参数的意思是做Reduce操作的时候,拿到的key并不是所有相同值给同一个Reduce,而是随机分发,然后Reduce做聚合,做完之后再做一轮MR,拿前面聚合过的数据再算结果。所以这个参数其实跟Hive.Map.aggr做的是类似的事情,只是拿到Reduce端来做,而且要额外启动一轮Job,所以其实不怎么推荐用,效果不明显。

如果说要改写SQL来优化的话,可以按照下面这么做:

[js]

view plain

copy

      1. /*改写前*/
      1. select a, count(distinct b) as c from tbl group by a;
      1. /*改写后*/
      1. select a, count(*) as c
      2. from (select distinct a, b from tbl) group by a;

join造成的倾斜,就比如上面描述的网站访问日志和用户表两个表join:

      1. select a.* from logs a join users b on a。user_id = b.user_id;

Hive给出的解决方案叫skew join,其原理把这种user_id = 0的特殊值先不在Reduce端计算掉,而是先写入hdfs,然后启动一轮Map join专门做这个特殊值的计算,期望能提高计算这部分值的处理速度。当然你要告诉Hive这个join是个skew join,即:

[js]

view plain

copy

      1. set Hive.optimize.skewjoin = true;

还有要告诉Hive如何判断特殊值,根据Hive.skewjoin.key设置的数量Hive可以知道,比如默认值是100000,那么超过100000条记录的值就是特殊值。

skew join的流程可以用下图描述:

 

另外对于特殊值的处理往往跟业务有关系,所以也可以从业务角度重写sql解决。比如前面这种倾斜join,可以把特殊值隔离开来(从业务角度说,users表应该不存在user_id = 0的情况,但是这里还是假设有这个值,使得这个写法更加具有通用性):

[js]

view plain

copy

      1. select a.* from
      2. (
      3. select a.*
      4. from (select * from logs where user_id = 0)  a
      5. join (select * from users where user_id = 0) b
      6. on a。user_id =  b。user_id
      7. union all
      8. select a.*
      9. from logs a join users b
      10. on a。user_id <> 0 and a。user_id = b.user_id
      11. )t;

数据倾斜不仅仅是Hive的问题,其实是share nothing架构下必然会碰到的数据分布问题,对此学界也有专门的研究,比如skewtune。

 

Pasted from <http://www.csdn.net/article/2015-01-13/2823530>

 

Configuring Hive

A number of configuration variables in Hive can be used by the administrator to change the behavior for their installations and user sessions. These variables can be configured in any of the following ways, shown in the order of preference:

 

    • Using the set command in the CLI or Beeline for setting session level values for the configuration variable for all statements subsequent to the set command. For example, the following command sets the scratch directory (which is used by Hive to store temporary output and plans) to /tmp/mydir for all subsequent statements:
      set hive.exec.scratchdir=/tmp/mydir;
    • Using the –hiveconf option of the hive command (in the CLI) or beeline command for the entire session. For example:
      bin/hive –hiveconf hive.exec.scratchdir=/tmp/mydir
    • In hive-site.xml. This is used for setting values for the entire Hive configuration (see hive-site.xml and hive-default.xml.template below). For example:
      <property>
      <name>hive.exec.scratchdir</name>
      <value>/tmp/mydir</value>
      <description>Scratch space for Hive jobs</description>
      </property>
    • In server-specific configuration files (supported starting Hive 0.14). You can set metastore-specific configuration values in hivemetastore-site.xml, and HiveServer2-specific
  • hive优化原则

    使用过hive一段时间,发现楼主讲的非常正确。

    基本原则:

    1:尽量尽早地过滤数据,减少每个阶段的数据量,对于分区表要加分区,同时只选择需要使用到的字段

    select… from A

    joinB

    on A.key= B.key

    whereA.userid>10

         andB.userid<10

           and A.dt=’20120417′

           and B.dt=’20120417′;

    应该改写为:

    select…. from (select …. from A

                      wheredt=’201200417′

                                      and userid>10

                                 ) a

    join (select …. from B

           wheredt=’201200417′

                        and userid <10   

         )b

    on a.key= b.key;

    2:尽量原子化操作,尽量避免一个SQL包含复杂逻辑

    可以使用中间表来完成复杂的逻辑

    droptable if exists tmp_table_1;

    createtable if not exists tmp_table_1 as

    select……;

     

    droptable if exists tmp_table_2;

    createtable if not exists tmp_table_2 as

    select……;

     

    droptable if exists result_table;

    createtable if not exists result_table as

    select……;

     

    droptable if exists tmp_table_1;

    droptable if exists tmp_table_2;

     

     

    3:单个SQL所起的JOB个数尽量控制在5个以下

     

    4:慎重使用mapjoin,一般行数小于2000行,大小小于1M(扩容后可以适当放大)的表才能使用,小表要注意放在join的左边(目前TCL里面很多都小表放在join的右边)。

    否则会引起磁盘和内存的大量消耗

     

    5:写SQL要先了解数据本身的特点,如果有join ,group操作的话,要注意是否会有数据倾斜

    如果出现数据倾斜,应当做如下处理:

    sethive.exec.reducers.max=200;

    setmapred.reduce.tasks= 200;—增大Reduce个数

    sethive.groupby.mapaggr.checkinterval=100000;–这个是group的键对应的记录条数超过这个值则会进行分拆,值根据具体数据量设置

    sethive.groupby.skewindata=true; —如果是group by过程出现倾斜 应该设置为true

    sethive.skewjoin.key=100000;–这个是join的键对应的记录条数超过这个值则会进行分拆,值根据具体数据量设置

    sethive.optimize.skewjoin=true;–如果是join 过程出现倾斜应该设置为true

     

    6:如果union all的部分个数大于2,或者每个union部分数据量大,应该拆成多个insertinto 语句,实际测试过程中,执行时间能提升50%

    insertoverwite table tablename partition (dt= ….)

    select….. from (

                       select… from A

                       unionall

                       select… from B

                      union all

                       select… from C

                                  ) R

    where…;

     

    可以改写为:

    insertinto table tablename partition (dt= ….)

    select…. from A

    WHERE…;

     

    insertinto table tablename partition (dt= ….)

    select…. from B

    WHERE…;

     

    insertinto table tablename partition (dt= ….)

    select…. from C

    WHERE…; 

    Pasted from <http://blog.csdn.net/an342647823/article/details/25703429>

    Index

    HIVE OPTIMIZATIONS WITH INDEXES, BLOOM-FILTERS AND STATISTICS

    https://snippetessay.wordpress.com/2015/07/25/hive-optimizations-with-indexes-bloom-filters-and-statistics/

    CompactIndex的实现原理类似一个lookup table,而非传统数据库中的B树。如果你对table A的col1做了索引,索引文件本身就是一个table,这个table会有3列,分别是col1的枚举值,每个值对应的数据文件位置,以及在这个文件位置中的偏移量。通过这种方式,可以减少你查询的数据量(偏移量可以告诉你从哪个位置开始找,自然只需要定位到相应的block),起到减少资源消耗的作用。但是就其性能来说,并没有很大的改善,很可能还不如构建索引需要花的时间。所以在集群资源充足的情况下,没有太大必要考虑索引。

    CompactIndex的还有一个缺点就是使用起来不友好,索引建完之后,使用之前还需要根据查询条件做一个同样剪裁才能使用,索引的内部结构完全暴露,而且还要花费额外的时间。具体看看下面的使用方法就了解了:

    Machine generated alternative text:
create index idx on table index_test_table(id)
as 憃rg.apache.Hadoop.Hive.ql.index.conipact.CompactlndexHandler with deferred
alter index idx on index_test_table rebuild;
/ * I # tJ抜 I 1RW 蟭桾 * ?
/ t 4fl< RD8MS?rn I a F{fl?i? IWL 慐fli /
create table my_index
as select _bucketname, _offsets
from default_index_test_table_idx_ where id = 10;
/*JI7, MR盡#_*/
set Hive.index.compact.file = /user/Hive/warehouse/my_index;
set Hive.input.format = org.apache.Hadoop.Hive.ql.index.compact.HiveCompactlnde:
select count(*) from index_test_table ihere 韉 = 10;

    Bloom Filter Index

    Bloom Filters are added to ORC indexes from Hive 1.2.0 onwards. Predicate pushdown can make use of bloom filters to better prune the row groups that do not satisfy the filter condition. The bloom filter indexes consist of a BLOOM_FILTER stream for each column specified through orc.bloom.filter.columns table properties. A BLOOM_FILTER stream records a bloom filter entry for each row group (default to 10,000 rows) in a column. Only the row groups that satisfy min/max row index evaluation will be evaluated against the bloom filter index.

    Bloom filter are suitable for queries using where together with the = operator:

    SELECT AVG(revenue) WHERE gender=0

    A bloom filter can apply to numeric, but also non-numeric (categorical) data, which is an advantage over the storage index. Internally, a bloom filter is a hash value for the data in a column in a given block. This means you can „ask a bloom filter if it contains a certain value, such as gender=male, without you needing to read the block at all. This obviously increases performance significantly, because most of the time a database is busy with reading non-relevant blocks for a query

    CREATE TABLE CUSTOMER (

    customerId int,

    gender tinyint,

    age tinyint,

    revenue decimal(10,2),

    name varchar(100),

    customerCategory int )

    STORED AS ORC

    TBLPROPERTIES

    ( ‘orc.compress’=’SNAPPY’,

    ‘orc.create.index’=’true’, ‘orc.bloom.filter.columns’=’gender’,

    ‘orc.bloom.filter.fpp’=’0.05’,

    ‘orc.stripe.size’=’268435456’,

    ‘orc.row.index.stride’=’10000’);

    delete and update hive

  • Error: Error while compiling statement: FAILED: SemanticException [Error 10294]: Attempt to do update or delete using transaction manager that does not support these operations.

    As mentioned in apache hive doc about ACID limitation, for ACID support table should be in ORC format plus non sorted and bucket enabled.

    https://cwiki.apache.org/confluence/display/Hive/Hive+Transactions#HiveTransactions-Limitations

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s