In Business detail record query scenario, there are many fields in each kind of records but the query from applications only use a small portion(about 30%) of them. Take a detail record called 'Other' for example, it contains about 300 fields. We took a statistics on the queries and found only 63 fields were used in one week.
Transforming the whole amount of records from CSV to CarbonData will consume large amount of CPU and Disk IO. To reduce the cost and enhance the performance of data loading, we propose to distinguish the Hot
fields from the Cold
. The total amount of data is called Cold Data
, while the portion of Hot fields is called Hot Data
.
During data loading, we directly take the input origin CSV data as cold data and will not move or make a copy of it. Then we read the CSV and only extract the hot fields to generate the hot data and store it as CarbonData.
As for querying, internally in CarbonData, it analyze the query to decide which part of data will be scanned. For example, if the query contains any cold fields, then the cold data will be scanned. Indexes can be created both on cold and hot data to make querying on it faster.
We can make use of Material View from CarbonData to achieve the goal, specifically described as following:
- Create a TEXTFILE based CarbonData table called
fact_table
in CarbonData that contains all the fields in the detail record. - Create index called
fact_dm
and MV calledfact_mv
onfact_table
. Thefact_mv
only contains the hot columns. Also create index calledmv_dm
onfact_mv
. - Call CarbonData's
ADD SEGMENT
command to load data. It will map the CSV data to CarbonData's segment. The following steps is required in to complete this procedure and they should be wrapped in a transaction: - CarbonData create a segment and associate its storage path with the directory where the CSV data resides.
- CarbonData reads the CSV data and generates
fact_dm
. In Bussiness usecase, the datamap is load immediate other than deferred rebuild, so this step will be done automatically by CarbonData. - At the same time, CarbonData generate
fact_mv
andmv_dm
. - As it is mentioned above,
ADD SEGMENT
has the same transaction semantic asLOAD DATA
in CarbonData.
Caution:User must perform all the following operations in CarbonSession.
- 1 User manually create fact_table in CarbonData and specify its storage format as TEXTFILE format
CREATE TABLE fact_table (
col1 bigint, col2 string, ..., col100 string)
STORED BY 'CarbonData'
TBLPROPERTIES(
'sourceType'='csv',
'csv.delimiter'=',',
'csv.header'='col1,col2,col100');
Description: Create a TEXTFILE based carbon table. User can specify the properties for the input data source in the TBLPROPERTIES.
- 2 User manually create indexes on fact_table
CREATE DATAMAP fact_dm1
ON TABLE fact_table
using 'minmax';
CREATE DATAMAP fact_dm2
ON TABLE fact_table
using 'bloomfilter'
DMPROPERTIES(
'INDEX_COLUMNS'='col1');
- 3 User manually create fact_mv
CREATE DATAMAP fact_mv
using 'mv'
DMPROPERTIES(
'SORT_COLUMNS'='col1, col5',
'DICTIONATY_INCLUDE'='col5, col10',
'NO_INVERTED_INDEX'='col1, col5',
'TABLE_BLOCKSIZE'='128MB',
'SORT_SCOPE'='local_sort')
as
'SELECT col1, col2, ..., col20 FROM fact_table';
Note: In Step4, the grammar of creating datamap on datamap is a little confusing. It would be better to add a syntactic sugar to create MV, such as
CREATE MATERIALIZE VIEW fact_mv
DMPROPERTIES(
'SORT_COLUMNS'='col1, col5',
'DICTIONATY_INCLUDE'='col5, col10',
'NO_INVERTED_INDEX'='col1, col5',
'TABLE_BLOCKSIZE'='128MB',
'SORT_SCOPE'='local_sort')
as
'SELECT col1, col2, ..., col20 FROM fact_table';
- 4 User manually create indexes on fact_mv
CREATE DATAMAP mv_dm1
ON TABLE fact_mv
using 'bloomfilter'
DMPROPERTIES(
'INDEX_COLUMNS'='col1');
- 5 User manually load csv data to fact_table and all the mvs and indexes will be updated. This operation should be done multiple times if the csv is generated periodically
In Bussiness scenario, in order to reduce data movement, we just mapping the origin csv to CarbonData segment using the following statement.
ALTER TABLE fact_table
ADD SEGMENT LOCATION 'path/to/data1'
Note: The above grammar is not available now in CarbonData, we need to implement it later.
To support common scenarios which can bear the movement of cold data, CarbonData can also copy data to the segment directory using the following statement.
LOAD DATA INPATH 'path/to/data1'
INTO TABLE fact_table
Note: For the above two statementes, to keep the same behavior as Hive, CarbonData do not validate the schema and file format of the source while it maps (or copys) the origin cold data to segment. If there are something wrong, a runtime error will occur in generating indexes or MVs.
- 6 User trigger compaction on fact_table
Compaction can be performed on fact_table
.
During compaction, the cold data is only readable, we don't rewrite it in Bussiness scenario. Only indexes & MVs on fact_table
and indexes on MV will be rewritten.
Moreover, in other scenarios, if the rewrite of cold data is acceptable, CarbonData can also move the cold data or merge small files. Notice that the csv file may be compressed, so the merging for cold data will not be simple concatenation.
- 7 Query on fact_table
In users' view, they can only see the fact_table. The query SQL has no difference between with and without this feature.
By using indexes and MVs, CarbonData Framework automatically optimize the queries on fact_table and will rewrite the query if possible.
-
1 Directly building index datamap and MV on Hive/Spark table
It would be great if we support building datamap and MV directly on Hive/Spark table. But there will be problems while supporting this feature. The most important is that the incremental build on datamap will be a problem since it's difficult to get the newly added data files.
Take index datamap for example, the index will be file scope -- we build indexes for each file such as value minmax/existence in each file. If the table is a common Hive table, user can perform all the DML statementes on it ,such as
LOAD DATA
,ADD PARTITION
,ALTER PARTITION
,DROP PARTITION
,INSERT SELECT
etc. (There may be data transformation in it). In order to generate indexes, in CarbonData we have hook all the statementes and find out all the newly generated datafiles and then we need to read the files and genereate indexes for them.If it is a carbon table, we can block all the DML statementes and only provide
ADD SEGMENT
,LOAD DATA
to map the origin source data to CarbonData. There is no data transformation and we can easily get the newly added files. The most valuable scenario is that we can map the historical Hive/Spark data (in CSV/Parquet/Orc format) to CarbonData and leverage the datamap features to enchance query performance.
The external datasource carbon table is still a carbon table. There is no difference from native carbon table except that its storage file format is not carbondata but CSV, Parquet, ORC etc.
We should add properties in table's meta to indicate its file format. And CarbonData will change the behavior of data loading and querying based on the file format.
CarbonData will create a new segment for each load.
We provide a new grammar called ADD SEGMENT
to load data to external datasource table. By using it, CarbonData stores the original data file locations in segment metadata. Since there is no data movement, the segment folder will be empty.
We also provide LOAD DATA
for data loading. In this case, it will copy the original data files to CarbonData's segment folder. And there exists data movement, the segment folder will not be empty.
As mentioned above, user map origin source data to CarbonData using ADD SEGMENT
statement which contains the files information. CarbonData create a new segment to map it to these files, and the files all are considered as newly added files. CarbonData read the files and generate the indexes for each file.
A new procedure is needed to support generating file scope indexes for an entire file.
While querying through CarbonSession, CarbonData will check whether this query can leverage datamaps. If it is possible, query on datamap will prune the data files that does not meet the query. Then CarbonData internally will call Spark related procedure with pruned result to query the original data files.
The incremental build on MV is the same as that on index datamap.
The query with MV is the same as that on index datamap.
- 1 CarbonData support creating indexes and MVs for CarbonData-External-DataSource table (tables that are not stored as CarbonData file format).
- 2 One of the original intentions of hot-cold-fields on a table is to reduce the disk IO. Therefore, during data loading, we wish not to relocation the cold data. Since the cold data will be shared to 3rd application after it is generated, we cannot move it to carbonstore. As a result, we wish to map the cold data to carbonstore (or use other techniques such as symbolic link). We'd prefer to add a new statement, which will add a segment and map the data to be loaded to the segment's storage path -- it is called loading of cold data.
- 3 While the cold data is loaded, index information can be generated, such min-max information for each column in each file. When the cold data is queried, these index information can be used to enhance the query performance.
- 4 While the cold data is loaded, MV and its indexes can also be generated. They should be in one transaction.
- 5 Note that the cold data is always compressed and the algorithm for compression is hadoop-compatible (which means that hadoop has already provided the input/output-stream interface to read/write the files, for example 'snappy', 'zstd'). CarbonData support querying on the compressed csv file.
- 6 To improve the loading performance and reduce IO, the procedure of generating
fact_dm
,fact_mv
andmv_dm
should be done together, requiring to read the cold data only once. - 7 Supporting compaction for such segments. Compaction should not move or copy the cold data.
- 8 Bussiness currently does not need to update the records, so the update feature for this feature is optional.
- 9 From the users' point of view, they only need to care about
fact_table
. All the queries from users will be based onfact_table
. CarbonData framework analyzes the query SQL, finds the corresponding MV or index and performs query optimization silently.
Design doc for carbondata supporting external datasource
Note for developers: Blocked operations must be implemented to block and have function test cases, supported operations should have corresponding function test cases.
Data Management
Create external format table
Support the following grammar to create an external format table
Note:
Load data to external format table
Currently external format table only support one way to load data, the grammar looks like below
Note:
LOAD DATA
,INSERT SELECT
and DataFrame.write to external format table.==> mention: external table should not support this command.
==> a better name for external format table.
Update and delete records
Currently we do not support update and delete operations on external format table.
Compaction
We support 'MINOR','MAJOR','CUSTOM' compaction on external format table.
During compaction, the fact data files will not be moved. We will map the paths from all the old segments to the new segment.
We should take care of 'CleanSegments after compaction', it should not delete the fact files in the compacted segments. Because the fact files is still used by the new segment.
Drop external format table
Drop external format table will not delete the fact files by default.
May be we can add a property in create table to indicate whether carbondata should delete the fact files. (PENDING)
Alter table
Supported operations:
Blocked operations:
Partition
External format table does not support partition now. So partition related command such as 'create partition table for external format', 'partition management' related command is blocked.
Bucketing
External format table does not support partition now.
Since 'bucketing' is specified in TBLPROPERTIES and it's blockled in creating external format table, we do not need to do extra operations here.
Segment Management
Add segment
Add segment is the only way to load data to external format table. Please refer to 'Load data to external format table' section in 'Data Management'.
Show segment
It seems that output of ShowSegment will display the fileformat of that segment, so we should correct it and display external format related information.
==> about the datasize of the segment,
get the size on runtime orstore it at the addsegment.(Note in documentation)Delete segment
We will support the DeleteSegmentById and DeleteSegmentByDate operations.
Delete segment will not clean the fact files by default. Its behavior is the same as 'drop table'.
Query with specified segments
We will support query with specified segments.
Index DataMap
MinMax and BloomFilter DataMap
We will support index datamap on external format table to accelerate the query.
Currently the index is file level index and we plan to provide two types of index: MinMax and BloomFilter index. The MinMax index gives the value range in each file and the BloomFilter index gives the existence of specified value in each file.
Since the datamap is file level, we will generate the indexes per file and use the path of the file as the fake shardName for data index file.
The structure looks like below:
During development, we will try to reuse the current code, take bloomfilter index for example, some simple design for implementation are shown as below.
Query
During pruning, the datamap will transform the 'shardName' and 'absoluteBlockletId' to 'blockFilePath' and 'relativeBlockletId'(in that block file), then it will return the 'ExtendBlocklet' containing these information. In another word, we will advance the transformation in pruning.~~
After this implementation, if the table is external format table, the 'blockFilePath' is the fact file path and 'relativeBlockletId' is '-1' meaning we will scan the whole file.~~
We will handle the getSplits for external format table separately. The pruning result of the datamap is a Blocklet that represents a whole file. And the datamap pruning procedure can be reused as we can treat each file as a shard and it only contains one blocklet.
Load data for index datamap
During the AddSegment procedure, we will trigger data loading for index datamap for external format table. We will use a new RDD to read the fact files and generate the index. Each file is a spark task, we will read the file once and generate all the indexes for this file. Each time we start/finish reading a file, the listener for onBlockStart/onBlockEnd will be notified.
Furthermore, we want to reduce disk reading if we have index datamap and MV on the same external format table, so we may consider a way to read the file once. This feature may not be provided in 1.5.1.
Compact fact table which has index datamap
Since the index is file level and during compaction we just re-map the fact files to new segmet, so for index datamap we can just move the file level index to the new folder.
Before:
After:
==> minmax/bloomfilter index file merge
Rebuild (create datamap on existed data)
In this case, we will directly read the index columns from the fact file and generate index data.
Preaggregate DataMap
We will not support preaggregate datamap on external format table now. The reason is that we just do not need it now.
MV
Create MV on external format table
We will support creating MV on external format table.
Because MV is a carbon table, to accelerate the query performance on MV, carbondata should support specifying table properties for MV, such as 'SORT_COLUMNS', 'DICTIONARY_INCLUDE', 'TABLE_BLOCKSIZE' etc.
Data loading for MV
For general data loading for MV, since we treat the MV as a carbon table, carbondata will internally do data loading by insert-select which is called dataframe-way.
But according to our observation, data loading through dataframe-way is much slower than datafile-way (loading by Load-Data). So we propose that if the query of MV is just selection, we do like to optimize the data loading for MV using datafile-way internally.
==> for parquet, datafile-way is impossible.
==> find out why dataframe-way is slow! (Brijoo)
Incremental rebuild for MV
For some MV which has no join, can be rebuilt incrementally.
Concurrent rebuild for MV
Carbondata should block concurrent rebuild for MV. (OR if carbondata can assure the correctness of the rebuilding, it can support this operation.)
Since MV may depends on different base table which may not even be carbon table, so the rebuild may fail in most situations. (For example, we cannot block data management on non-carbontable which will affect the MV rebuilding)
If the rebuilding for MV is failed, carbondata should revert the operation.
Rebuild MV automatically
We can create non-deferred-rebuild MV.
Once the data is loaded in base table, the MV rebuilding will be triggered immediately. The two procedures are considered as a transaction.
==> may not support MV in this transaction? (mention the requirement in MV feature not in this design)
If we do not support concurrent rebuilding for MV, it means that we will not support concurrent data loading for carbon table if it has non-deferred-rebuild MV.
Compaction for MV
To accelerate the query performance, we may need to do compaction for MV. Since the segment between fact table and MV is 1-to-1 mapping, we do want the compaction keep this mapping. For example, compaction on fact table's segment #0, #1, #2 to #0.1 will result a corresponding compaction on MV's segment #0, #1, #2 to #0.1.~~
Segment management for MV
Support index datamap on MV
We will support index datamap on MV.
Others Pending
If a table have MV, will it