Apache Kudu Zbigniew Baranowski

40 Slides684.59 KB

Apache Kudu Zbigniew Baranowski


What is KUDU? New storage engine for structured data (tables) – does not use HDFS! Columnar store Mutable (insert, update, delete) Written in C Apache-licensed – open source – Quite new - 1.0 version recently released First commit on October 11th, 2012 – and immature?

KUDU tries to fill the gap HDFS excels at Scanning of large amount of data at speed Accumulating data with high throughput HBASE (on HDFS) excels at – Fast random lookups by key – Making data mutable

Table oriented storage A Kudu table has RDBMS-like schema – Primary key (one or many columns), No secondary indexes – Finite and constant number of columns (unlike HBase) – Each column has a name and type boolean, int(8,16,32,64), float, double, timestamp, string, binary Horizontally partitioned (range, hash) – partitions are called tablets – tablets can have 3 or 5 replicas

Data Consistency Writing – Single row mutations done atomically across all columns – No multi-row ACID transactions Reading – Tuneable freshness of the data read whatever is available or wait until all changes committed in WAL are available – Snapshot consistency changes made during scanning are not reflected in the results point-in-time queries are possible – (based on provided timestamp)

Kudu simplifies BigData deployment model for online analytics (low latency ingestion and access) Classical low latency design Stream Source Stream Source Stream Source Even ts Events Staging area Events Flush periodically Flush immediately Big Files HDFS Batch processing Indexed data Fast data access

Implementing low latency with Kudu Stream Source Stream Source Stream Source Even ts Events Events Batch processing Fast data access

Kudu Architecture

Architecture overview Master server (can be multiple masters for HA) – Stores metadata - tables definitions – Tablets directory (tablets locations) – Coordinates the cluster reconfigurations Tablet servers (worker nodes) – Writes and reads tablets Stored on local disks (no HDFS) – Tracks status of tablets replicas (followers) Replicates the data to followers

Tables and tablets Map of table TEST: Master TabletID TabletID TabletID TEST1 TEST1 TEST1 TEST2 TEST2 TEST2 TEST3 TEST3 TEST3 Leader Leader Leader TS1 TS1 TS1 TS4 TS4 TS4 TS3 TS3 TS3 Follower1 Follower1 Follower1 TS2 TS2 TS2 TS1 TS1 TS1 TS4 TS4 TS4 Follower2 Follower2 Follower2 TS3 TS3 TS3 TS2 TS2 TS2 TS1 TS1 TS1 Leader TEST1 TEST1 TEST1 Leader TEST2 TEST2 TEST2 Leader TEST3 TEST3 TabletServer2 TabletServer3 TabletServer4 TEST3 TabletServer1

Data changes propagation in Kudu (Raft Consensus - https://raft.github.io) let b ta Client s on ti ca o l s Master t Ge Write Tablet server X (x row s) Succ e ss C) N S K s) A ( w C ti A ro m x ( m ite Co r W Commit K AC WAL Tablet 1 (leader) Co W m rit m e( it( xr AS ow NC s) ) Tablet server Z Tablet server Y Commit Tablet 1 (follower) Commit WAL Tablet 1 (follower) WAL

Insert into tablet (without uniqueness check) MemRowSet Row: Col1,Col2, Col3 INSERT B tree Leafs sorted by Primary Key Row1,Row2,Row3 Flush DiskRowSet1 (32MB) PK Col1 Col2 PK Col1 Col2 Col3 Bloom filters PK {min, max} Col3 Bloom filters Interval Interval tree tree DiskRowSet2 (32MB) Columnar store encoded similarly to Parquet PK {min, max} Rows sorted by PK. Tablets Server Bloom filters for PK ranges. Stored in cached btree Interval tree keeps track of PK ranges within DiskRowSets There might be Ks of sets per tablet

DiskRowSet compaction DiskRowSet1 (32MB) PK {A, G} DiskRowSet1 (32MB) PK {A, D} DiskRowSet2 (32MB) PK {E, G} Compact DiskRowSet2 (32MB) PK {B, E} Periodical task Removes deleted rows Reduces the number of sets with overlapping PK ranges Does not create bigger DiskRowSets – 32MB size for each DRS is preserved

How columns are stored on disk maps row offsets to pages (DiskRowSet) maps PK to row offset Values Btree index Page metadata Values Page metadata Values Page metadata Values Page metadata Values Btree index Page metadata Values Page metadata Values Page metadata Column2 Values Pages are encoded with a variety of encodings, such as dictionary 32MB encoding, bitshuffle, or RLE Page metadata Values Page metadata Btree index PK Btree index Column1 Size 256KB Values Page metadata Values Page metadata Column3 Values Page metadata Pages can be compressed: Snappy, LZ4 or ZLib

Kudu deployment

3 options for deployments Build from source Using RPMs – 1 core rpms – 2 service rpms (master and servers) – One shared config file Using Cloudera manager – Click, click, click, done

Interfacing with Kudu

Table access and manipulations Operations on tables (NoSQL) – insert, update, delete, scan – Python, C , Java API Integrated with – Impala & Hive(SQL), MapReduce, Spark – Flume sink (ingestion)

Manipulating Kudu tables with SQL(Impala/Hive) Table creation CREATE TABLE kudu example ( runnumber BIGINT, eventnumber BIGINT, project STRING, streamname STRING, prodstep STRING, datatype STRING, amitag STRING, lumiblockn BIGINT, bunchid BIGINT, ) DISTRIBUTE BY HASH (runnumber) INTO 64 BUCKETS TBLPROPERTIES( 'storage handler' 'com.cloudera.kudu.hive.KuduStorageHandler', 'kudu.table name' ‘example table', 'kudu.master addresses' ‘kudu-master.cern.ch:7051', 'kudu.key columns' 'runnumber, eventnumber' ); DMLs insert insert update delete into kudu example values (1,30,'test', .); into kudu example select * from data parquet; kudu example set datatype 'test' where runnumber 1; from kudu example where project 'test'; Queries select count(*),max(eventnumber) from kudu example where datatype like '%AOD%‘ group by runnumber; select * from kudu example k, parquet table p where k.runnumber p.runnumber ;

Creating table with Java import org.kududb.* //CREATING TABLE String tableName "my table"; String KUDU MASTER NAME "master.cern.ch" KuduClient client new KuduClient.KuduClientBuilder(KUDU MASTER NAME).build(); List ColumnSchema columns new ArrayList(); columns.add(new ColumnSchema.ColumnSchemaBuilder("runnumber",Type.INT64). key(true).encoding(ColumnSchema.Encoding.BIT SHUFFLE).nullable(false).compressionAlgorithm(ColumnSchema.CompressionAlgorithm.S NAPPY).build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("eventnumber",Type.INT64). key(true).encoding(ColumnSchema.Encoding.BIT SHUFFLE).nullable(false).compressionAlgorithm(ColumnSchema.CompressionAlgorithm.S NAPPY).build()); . Schema schema new Schema(columns); List String partColumns new ArrayList (); partColumns.add("runnumber"); partColumns.add("eventnumber"); CreateTableOptions options new CreateTableOptions().addHashPartitions(partColumns, 64).setNumReplicas(3); client.createTable(tableName, schema,options); .

Inserting rows with Java //INSERTING KuduTable table client.openTable(tableName); KuduSession session client.newSession(); Insert insert table.newInsert(); PartialRow row insert.getRow(); row.addLong(0, 1); row.addString(2,"test") . session.apply(insert); //stores them in memory on client side (for batch upload) session.flush(); //sends data to Kudu .

Scanner in Java //configuring column projection List String projectColumns new ArrayList (); projectColumns.add("runnumber"); projectColumns.add("dataType"); //setting a scan range PartialRow start s.newPartialRow(); start.addLong("runnumber", 8); PartialRow end s.newPartialRow(); end.addLong("runnumber",10); KuduScanner scanner client.newScannerBuilder(table) .lowerBound(start) .exclusiveUpperBound(end) .setProjectedColumnNames(projectColumns) .build(); while (scanner.hasMoreRows()) { RowResultIterator results scanner.nextRows(); while (results.hasNext()) { RowResult result results.next(); System.out.println(result.getString(1)); //getting 2nd column } }

Spark with Kudu wget http://central.maven.org/maven2/org/apache/kudu/kudu-spark 2.10/1.0.0/kudu-spark spark-shell --jars kudu-spark 2.10-1.0.0.jar import org.apache.kudu.spark.kudu. // Read a table from Kudu val df sqlContext.read.options( Map("kudu.master"- “kudu master.cern.ch:7051“, "kudu.table" - “kudu table“)s).kudu // Query using the DF API. df.select(df("runnumber"),df("eventnumber"),df("db0")).filter( "runnumber" 169864).filter( "e ventnumber" 1).show(); // .or register a temporary table and use SQL df.registerTempTable("kudu table") sqlContext.sql("select id from kudu table where id 5").show() // Create a new Kudu table from a dataframe schema // NB: No rows from the dataframe are inserted into the table kuduContext.createTable("test table", df.schema, Seq("key"), new CreateTableOptions().setNumReplicas(1)) // Insert data kuduContext.insertRows(df, "test table")

Kudu Security To be done!

Performance (based on ATLAS EventIndex case)

Average row length Very good compaction ratio – The same like parquet No compression 3000 Snappy Each row consists of 56 attributes Most of them are strings Few integers and floats GZip-like 2819 2500 2000 length in CSV Bytes 1559 1500 1000 890 777 538 500 314 171 0 kudu 87 189 parquet 326 90 hbase avro 217

Insertion rates (per machine, per partition) with Impala Average ingestion speed – worse than parquet – better than HBase No compression Snappy GZip-like 140 120 115 Insertion spped (kHz) 100 85 80 70 64 60 49 38 40 20 7.21 0 11.34 10.9 kudu 5.3 parquet 4.4 hbase 4.9 avro

Random lookup with Impala Good random data lookup speed – Similar to Hbase No compression Snappy GZip-like 30 Average random lookup spped [s] 27 25 19 20 16 15 10 5 0 0.27 0.45 0.32 0.62 0.86 0.89 kudu parquet 0.56 0.4 hbase 0.5 avro

Data scan rate per core with a predicate on non PK column (using Impala) Quite good data scanning speed – Much better than HBase – If natively supported predicates operations are used it is even faster than parquet No compression Snappy GZip-like 600 488 500 Scan speed (kHz) 435 400 345 300 260 237 232 215 200 136 129 131 120 100 0 62 kudu parquet hbase

Kudu monitoring

Cloudera Manager A lot of metrics are published though servers http All collected by CM agents and can be plotted Predefined CM dashboards – Monitoring of Kudu processes – Workload plots CM can be also used for Kudu configuration

CM – Kudu host status

CM - Workload plots

CM - Resource utilisation

Observations & Conclusions

What is nice about Kudu The first one in Big Data open source world trying to combine columnar store indexing Simple to deploy It works (almost) without problems It scales (this depends how the schema is designed) – Writing, Accessing, Scanning Integrated with Big Data mainstream processing frameworks – Spark, Impala, Hive, MapReduce – SQL and NoSQL on the same data Gives more flexibility in optimizing schema design comparing to HBase (to levels of partitioning) Cloudera is pushing to deliver production-like quality of the software ASAP

What is bad about Kudu? No security (it should be added in next releases) – authentication (who connected) – authorization (ACLs) Raft consensus not always works as it should – Too frequent tablet leader changes (sometime leader cannot be elected at all) – Period without leader is quite long (sometimes never ends) – This freezes updates on tables Handling disk failures – you have to erase/reinitialize entire server Only one index per table No nested types (but there is a binary type) Cannot control tablet placement on servers

When to Kudu can be useful? When you have structured ‘big data’ – Like in a RDBMS – Without complex types When sequential and random data access is required simultaneously and have to scale – Data extraction and analytics at the same time – Time series When low ingestion latency is needed – and lambda architecture is too expensive

Learn more Main page: https://kudu.apache.org/ Video: https://www.oreilly.com/ideas/kudu-resolving-transactional-and-analytic-tr ade-offs-inhadoop Whitepaper: http://kudu.apache.org/kudu.pdf KUDU project: https://github.com/cloudera/kudu Some Java code examples: https://gitlab.cern.ch:8443/zbaranow/kudu-atlas-eventindex Get Cloudera Quickstart VM and test it

Related Articles

Back to top button