Skip to content

Instantly share code, notes, and snippets.

@xuchuanyin
Created September 11, 2019 12:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save xuchuanyin/cb264f2d7e94d6e185a55ea962e91ce1 to your computer and use it in GitHub Desktop.
Save xuchuanyin/cb264f2d7e94d6e185a55ea962e91ce1 to your computer and use it in GitHub Desktop.
carbondata_doc_share

CarbonData Support External DataSource

1 Business Scenario

In Business detail record query scenario, there are many fields in each kind of records but the query from applications only use a small portion(about 30%) of them. Take a detail record called 'Other' for example, it contains about 300 fields. We took a statistics on the queries and found only 63 fields were used in one week.

Transforming the whole amount of records from CSV to CarbonData will consume large amount of CPU and Disk IO. To reduce the cost and enhance the performance of data loading, we propose to distinguish the Hot fields from the Cold. The total amount of data is called Cold Data, while the portion of Hot fields is called Hot Data.

During data loading, we directly take the input origin CSV data as cold data and will not move or make a copy of it. Then we read the CSV and only extract the hot fields to generate the hot data and store it as CarbonData.

As for querying, internally in CarbonData, it analyze the query to decide which part of data will be scanned. For example, if the query contains any cold fields, then the cold data will be scanned. Indexes can be created both on cold and hot data to make querying on it faster.

We can make use of Material View from CarbonData to achieve the goal, specifically described as following:

  1. Create a TEXTFILE based CarbonData table called fact_table in CarbonData that contains all the fields in the detail record.
  2. Create index called fact_dm and MV called fact_mv on fact_table. The fact_mv only contains the hot columns. Also create index called mv_dm on fact_mv.
  3. Call CarbonData's ADD SEGMENT command to load data. It will map the CSV data to CarbonData's segment. The following steps is required in to complete this procedure and they should be wrapped in a transaction:
  4. CarbonData create a segment and associate its storage path with the directory where the CSV data resides.
  5. CarbonData reads the CSV data and generates fact_dm. In Bussiness usecase, the datamap is load immediate other than deferred rebuild, so this step will be done automatically by CarbonData.
  6. At the same time, CarbonData generate fact_mv and mv_dm.
  7. As it is mentioned above, ADD SEGMENT has the same transaction semantic as LOAD DATA in CarbonData.

2 Usage(How to use)

Caution:User must perform all the following operations in CarbonSession.

  • 1 User manually create fact_table in CarbonData and specify its storage format as TEXTFILE format
CREATE TABLE fact_table (
  col1 bigint, col2 string, ..., col100 string)
STORED BY 'CarbonData'
TBLPROPERTIES(
  'sourceType'='csv',
  'csv.delimiter'=',',
  'csv.header'='col1,col2,col100');

Description: Create a TEXTFILE based carbon table. User can specify the properties for the input data source in the TBLPROPERTIES.

  • 2 User manually create indexes on fact_table
CREATE DATAMAP fact_dm1
ON TABLE fact_table
using 'minmax';

CREATE DATAMAP fact_dm2
ON TABLE fact_table
using 'bloomfilter'
DMPROPERTIES(
  'INDEX_COLUMNS'='col1');
  • 3 User manually create fact_mv
CREATE DATAMAP fact_mv
using 'mv'
DMPROPERTIES(
  'SORT_COLUMNS'='col1, col5',
  'DICTIONATY_INCLUDE'='col5, col10',
  'NO_INVERTED_INDEX'='col1, col5',
  'TABLE_BLOCKSIZE'='128MB',
  'SORT_SCOPE'='local_sort')
as
  'SELECT col1, col2, ..., col20 FROM fact_table';

Note: In Step4, the grammar of creating datamap on datamap is a little confusing. It would be better to add a syntactic sugar to create MV, such as

CREATE MATERIALIZE VIEW fact_mv
DMPROPERTIES(
  'SORT_COLUMNS'='col1, col5',
  'DICTIONATY_INCLUDE'='col5, col10',
  'NO_INVERTED_INDEX'='col1, col5',
  'TABLE_BLOCKSIZE'='128MB',
  'SORT_SCOPE'='local_sort')
as
  'SELECT col1, col2, ..., col20 FROM fact_table';
  • 4 User manually create indexes on fact_mv
CREATE DATAMAP mv_dm1
ON TABLE fact_mv
using 'bloomfilter'
DMPROPERTIES(
  'INDEX_COLUMNS'='col1');
  • 5 User manually load csv data to fact_table and all the mvs and indexes will be updated. This operation should be done multiple times if the csv is generated periodically

In Bussiness scenario, in order to reduce data movement, we just mapping the origin csv to CarbonData segment using the following statement.

ALTER TABLE fact_table
ADD SEGMENT LOCATION 'path/to/data1'

Note: The above grammar is not available now in CarbonData, we need to implement it later.

To support common scenarios which can bear the movement of cold data, CarbonData can also copy data to the segment directory using the following statement.

LOAD DATA INPATH 'path/to/data1'
INTO TABLE fact_table

Note: For the above two statementes, to keep the same behavior as Hive, CarbonData do not validate the schema and file format of the source while it maps (or copys) the origin cold data to segment. If there are something wrong, a runtime error will occur in generating indexes or MVs.

  • 6 User trigger compaction on fact_table

Compaction can be performed on fact_table.

During compaction, the cold data is only readable, we don't rewrite it in Bussiness scenario. Only indexes & MVs on fact_table and indexes on MV will be rewritten.

Moreover, in other scenarios, if the rewrite of cold data is acceptable, CarbonData can also move the cold data or merge small files. Notice that the csv file may be compressed, so the merging for cold data will not be simple concatenation.

  • 7 Query on fact_table

In users' view, they can only see the fact_table. The query SQL has no difference between with and without this feature.

By using indexes and MVs, CarbonData Framework automatically optimize the queries on fact_table and will rewrite the query if possible.

3 Comparing with alternatives

  • 1 Directly building index datamap and MV on Hive/Spark table

    It would be great if we support building datamap and MV directly on Hive/Spark table. But there will be problems while supporting this feature. The most important is that the incremental build on datamap will be a problem since it's difficult to get the newly added data files.

    Take index datamap for example, the index will be file scope -- we build indexes for each file such as value minmax/existence in each file. If the table is a common Hive table, user can perform all the DML statementes on it ,such as LOAD DATA, ADD PARTITION, ALTER PARTITION, DROP PARTITION, INSERT SELECT etc. (There may be data transformation in it). In order to generate indexes, in CarbonData we have hook all the statementes and find out all the newly generated datafiles and then we need to read the files and genereate indexes for them.

    If it is a carbon table, we can block all the DML statementes and only provide ADD SEGMENT, LOAD DATA to map the origin source data to CarbonData. There is no data transformation and we can easily get the newly added files. The most valuable scenario is that we can map the historical Hive/Spark data (in CSV/Parquet/Orc format) to CarbonData and leverage the datamap features to enchance query performance.

4 More Details

Create CarbonData External DataSource Table

The external datasource carbon table is still a carbon table. There is no difference from native carbon table except that its storage file format is not carbondata but CSV, Parquet, ORC etc.

We should add properties in table's meta to indicate its file format. And CarbonData will change the behavior of data loading and querying based on the file format.

Load Data

CarbonData will create a new segment for each load.

We provide a new grammar called ADD SEGMENT to load data to external datasource table. By using it, CarbonData stores the original data file locations in segment metadata. Since there is no data movement, the segment folder will be empty.

We also provide LOAD DATA for data loading. In this case, it will copy the original data files to CarbonData's segment folder. And there exists data movement, the segment folder will not be empty.

Incremental Build on Index Datamap

As mentioned above, user map origin source data to CarbonData using ADD SEGMENT statement which contains the files information. CarbonData create a new segment to map it to these files, and the files all are considered as newly added files. CarbonData read the files and generate the indexes for each file.

A new procedure is needed to support generating file scope indexes for an entire file.

Query with Index Datamap

While querying through CarbonSession, CarbonData will check whether this query can leverage datamaps. If it is possible, query on datamap will prune the data files that does not meet the query. Then CarbonData internally will call Spark related procedure with pruned result to query the original data files.

Incremental Build on MV

The incremental build on MV is the same as that on index datamap.

Query with MV

The query with MV is the same as that on index datamap.

5 Requirements for CarbonData(Features to implement)

  • 1 CarbonData support creating indexes and MVs for CarbonData-External-DataSource table (tables that are not stored as CarbonData file format).
  • 2 One of the original intentions of hot-cold-fields on a table is to reduce the disk IO. Therefore, during data loading, we wish not to relocation the cold data. Since the cold data will be shared to 3rd application after it is generated, we cannot move it to carbonstore. As a result, we wish to map the cold data to carbonstore (or use other techniques such as symbolic link). We'd prefer to add a new statement, which will add a segment and map the data to be loaded to the segment's storage path -- it is called loading of cold data.
  • 3 While the cold data is loaded, index information can be generated, such min-max information for each column in each file. When the cold data is queried, these index information can be used to enhance the query performance.
  • 4 While the cold data is loaded, MV and its indexes can also be generated. They should be in one transaction.
  • 5 Note that the cold data is always compressed and the algorithm for compression is hadoop-compatible (which means that hadoop has already provided the input/output-stream interface to read/write the files, for example 'snappy', 'zstd'). CarbonData support querying on the compressed csv file.
  • 6 To improve the loading performance and reduce IO, the procedure of generating fact_dm, fact_mv and mv_dm should be done together, requiring to read the cold data only once.
  • 7 Supporting compaction for such segments. Compaction should not move or copy the cold data.
  • 8 Bussiness currently does not need to update the records, so the update feature for this feature is optional.
  • 9 From the users' point of view, they only need to care about fact_table. All the queries from users will be based on fact_table. CarbonData framework analyzes the query SQL, finds the corresponding MV or index and performs query optimization silently.
@xuchuanyin
Copy link
Author

Design doc for carbondata supporting external datasource

Note for developers: Blocked operations must be implemented to block and have function test cases, supported operations should have corresponding function test cases.

Data Management

Create external format table

Support the following grammar to create an external format table

CREATE TABLE fact_table (
  col1 bigint, col2 string, ..., col100 string)
STORED BY 'CarbonData'
TBLPROPERTIES(
  'format'='csv',
  'csv.delimiter'=',',
  'csv.header'='col1,col2,col100');

Note:

  • The 'format' in TBLPROPERTIES means that this table is an external format table and it is a carbon managed table.
  • For external format table, it only allows those properties whose prefix is the value of 'format'.
  • For external CSV format table, if the user specify the 'csv.header', the header must contain the table fields.
  • 'DESC' and 'DESC FORMATTED' command should display external format related information.
  • We do not support creating external format table through CTAS.

Load data to external format table

Currently external format table only support one way to load data, the grammar looks like below

ALTER TABLE fact_table
ADD SEGMENT LOCATION 'path/to/data1,path/to/data2'

Note:

  • User can specify multiple paths seperated by comma.
  • The path can be directory or file. If it is directory, carbondata will recursively search the files.
  • Whether use can specify two paths that are exactly the same or specify two paths that has common files. (PENDING) Give error for it. ==> DISCUSS OFFLINE
  • We do not support LOAD DATA, INSERT SELECT and DataFrame.write to external format table.
  • AddSegment will only create a new segment in table and map the paths to the segment by recording the fact file path information in the metadata of the segment.

==> mention: external table should not support this command.
==> a better name for external format table.

Update and delete records

Currently we do not support update and delete operations on external format table.

Compaction

We support 'MINOR','MAJOR','CUSTOM' compaction on external format table.
During compaction, the fact data files will not be moved. We will map the paths from all the old segments to the new segment.
We should take care of 'CleanSegments after compaction', it should not delete the fact files in the compacted segments. Because the fact files is still used by the new segment.

Drop external format table

Drop external format table will not delete the fact files by default.
May be we can add a property in create table to indicate whether carbondata should delete the fact files. (PENDING)

Alter table

Supported operations:

  • RENAME TABLE
  • DROP COLUMNS
  • CHANGE DATA TYPE

Blocked operations:

  • ADD COLUMNS
  • MERGE Index
  • SET and UNSET for non external format related table properties

Partition

External format table does not support partition now. So partition related command such as 'create partition table for external format', 'partition management' related command is blocked.

Bucketing

External format table does not support partition now.
Since 'bucketing' is specified in TBLPROPERTIES and it's blockled in creating external format table, we do not need to do extra operations here.

Segment Management

Add segment

Add segment is the only way to load data to external format table. Please refer to 'Load data to external format table' section in 'Data Management'.

Show segment

It seems that output of ShowSegment will display the fileformat of that segment, so we should correct it and display external format related information.
==> about the datasize of the segment, get the size on runtime or store it at the addsegment.(Note in documentation)

Delete segment

We will support the DeleteSegmentById and DeleteSegmentByDate operations.
Delete segment will not clean the fact files by default. Its behavior is the same as 'drop table'.

Query with specified segments

We will support query with specified segments.

Index DataMap

MinMax and BloomFilter DataMap

We will support index datamap on external format table to accelerate the query.

Currently the index is file level index and we plan to provide two types of index: MinMax and BloomFilter index. The MinMax index gives the value range in each file and the BloomFilter index gives the existence of specified value in each file.

Since the datamap is file level, we will generate the indexes per file and use the path of the file as the fake shardName for data index file.
The structure looks like below:

${datamap_name}/${segment_name}/File_level_${fact_file1_path_with_base64_encoding}/${column_name}.bloomindex
                             ../File_level_${fact_file2_path_with_base64_encoding}/${column_name}.bloomindex

During development, we will try to reuse the current code, take bloomfilter index for example, some simple design for implementation are shown as below.

Query

Currently BloomFilterDataMapModel stores 'shardName' and 'absoluteBlockletId' and after pruning, the datamap returns these information in an 'ExtendBlocklet' object. The following procedure will transform it to get 'blockFilePath' and 'relativeBlockletId'(in that block file). ~~

As for bloomfilter for external format, it works in file scope, so we will refactor the previous implementation of pruning.~~~~

During pruning, the datamap will transform the 'shardName' and 'absoluteBlockletId' to 'blockFilePath' and 'relativeBlockletId'(in that block file), then it will return the 'ExtendBlocklet' containing these information. In another word, we will advance the transformation in pruning.~~
After this implementation, if the table is external format table, the 'blockFilePath' is the fact file path and 'relativeBlockletId' is '-1' meaning we will scan the whole file.~~

We will handle the getSplits for external format table separately. The pruning result of the datamap is a Blocklet that represents a whole file. And the datamap pruning procedure can be reused as we can treat each file as a shard and it only contains one blocklet.

Load data for index datamap

During the AddSegment procedure, we will trigger data loading for index datamap for external format table. We will use a new RDD to read the fact files and generate the index. Each file is a spark task, we will read the file once and generate all the indexes for this file. Each time we start/finish reading a file, the listener for onBlockStart/onBlockEnd will be notified.

Furthermore, we want to reduce disk reading if we have index datamap and MV on the same external format table, so we may consider a way to read the file once. This feature may not be provided in 1.5.1.

Compact fact table which has index datamap

Since the index is file level and during compaction we just re-map the fact files to new segmet, so for index datamap we can just move the file level index to the new folder.
Before:

${datamap_name}/segment_0/File_level_${fact_file1_path_with_base64_encoding}/${column_name}.bloomindex
               segment_0/File_level_${fact_file2_path_with_base64_encoding}/${column_name}.bloomindex
               segment_1/File_level_${fact_file3_path_with_base64_encoding}/${column_name}.bloomindex
               segment_1/File_level_${fact_file4_path_with_base64_encoding}/${column_name}.bloomindex

After:

${datamap_name}/segment_0.1/File_level_${fact_file1_path_with_base64_encoding}/${column_name}.bloomindex
                segment_0.1/File_level_${fact_file2_path_with_base64_encoding}/${column_name}.bloomindex
                segment_0.1/File_level_${fact_file3_path_with_base64_encoding}/${column_name}.bloomindex
                segment_0.1/File_level_${fact_file4_path_with_base64_encoding}/${column_name}.bloomindex

==> minmax/bloomfilter index file merge

Rebuild (create datamap on existed data)

In this case, we will directly read the index columns from the fact file and generate index data.

Preaggregate DataMap

We will not support preaggregate datamap on external format table now. The reason is that we just do not need it now.

MV

Create MV on external format table

We will support creating MV on external format table.

Because MV is a carbon table, to accelerate the query performance on MV, carbondata should support specifying table properties for MV, such as 'SORT_COLUMNS', 'DICTIONARY_INCLUDE', 'TABLE_BLOCKSIZE' etc.

Data loading for MV

For general data loading for MV, since we treat the MV as a carbon table, carbondata will internally do data loading by insert-select which is called dataframe-way.

But according to our observation, data loading through dataframe-way is much slower than datafile-way (loading by Load-Data). So we propose that if the query of MV is just selection, we do like to optimize the data loading for MV using datafile-way internally.
==> for parquet, datafile-way is impossible.
==> find out why dataframe-way is slow! (Brijoo)

Incremental rebuild for MV

For some MV which has no join, can be rebuilt incrementally.

Concurrent rebuild for MV

Carbondata should block concurrent rebuild for MV. (OR if carbondata can assure the correctness of the rebuilding, it can support this operation.)

Since MV may depends on different base table which may not even be carbon table, so the rebuild may fail in most situations. (For example, we cannot block data management on non-carbontable which will affect the MV rebuilding)

If the rebuilding for MV is failed, carbondata should revert the operation.

Rebuild MV automatically

We can create non-deferred-rebuild MV.

Once the data is loaded in base table, the MV rebuilding will be triggered immediately. The two procedures are considered as a transaction.
==> may not support MV in this transaction? (mention the requirement in MV feature not in this design)

If we do not support concurrent rebuilding for MV, it means that we will not support concurrent data loading for carbon table if it has non-deferred-rebuild MV.

Compaction for MV

To accelerate the query performance, we may need to do compaction for MV. Since the segment between fact table and MV is 1-to-1 mapping, we do want the compaction keep this mapping. For example, compaction on fact table's segment #0, #1, #2 to #0.1 will result a corresponding compaction on MV's segment #0, #1, #2 to #0.1.~~

Segment management for MV

Support index datamap on MV

We will support index datamap on MV.

Others Pending

If a table have MV, will it

  • support compaction?
  • delete segment?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment