Skip to content

Instantly share code, notes, and snippets.

@MarkPryceMaherMSFT
Last active September 6, 2022 18:10
Show Gist options
  • Save MarkPryceMaherMSFT/15309b956cc6ebd528449ba0fdbb8832 to your computer and use it in GitHub Desktop.
Save MarkPryceMaherMSFT/15309b956cc6ebd528449ba0fdbb8832 to your computer and use it in GitHub Desktop.
Example of reading Delta Lake with Azure Synapse Dedicated SQL Pool. This is a POC, just to see if this is possible. *NON-PRODUCTION CODE*
/*
IF OBJECT_ID('mpmtest') IS NOT NULL
BEGIN;
DROP TABLE mpmtest
END
declare @path varchar(400), @dt datetime2, @credential varchar(500),@outputable varchar(500),@display int, @debug int;
set @path = 'https://cstestwss.blob.core.windows.net/csetestws/delta/demo/'
--set @dt = convert(datetime2,'2022/07/06 18:37:00'); --getdate(); -- for time travel --
set @dt = getdate(); -- for time travel --
set @credential = 'IDENTITY= ''Shared Access Signature'', SECRET = ''___SECRET____''';
set @outputable = 'mpmtest' -- leave empty for temp table
set @display = 1; -- if 1 display results
set @debug = 1;
exec delta @path,@dt,@credential,@outputable,@display, @debug
--select count(*) from mpmtest;
*/
create PROC [dbo].[delta]
@folder [varchar](4000),
@dt [datetime2],
@credential [varchar](4000),
@dest_table [varchar](4000),
@display int, -- 0 dont display, 1 display
@debug int -- 0 dont debug , 1 debug mode
AS
begin
-- version info
-- 0.1 - Very basic, but sort of worked.
-- 0.2 - Added support for remove files.
-- 0.3 - Fixed bug , in authenication
-- 0.4 - Improved performance in handling add/remove files
-- 0.5 - Added support for checkpoint files
-- 0.6 - Changed the json handling, added latest checkpoint handing
DECLARE @ts bigint,@tsmil bigint, @json varchar(8000), @tsql varchar(8000);
DECLARE @json_remove varchar(8000), @tsql_checkpoint varchar(8000);
DECLARE @td_checkpoint1 datetime2, @td_checkpoint2 datetime2;
DECLARE @last_checkpoint varchar(25), @jsonversionchar varchar(25),@checkpointversion varchar(25) ;
DECLARE @output_sql varchar(max);
DECLARE @checkpoint_file_count bigint;
-- default: AAD authenication
IF (@credential IS NULL)
SET @credential = ''
IF (@credential <> '')
SET @credential = ' CREDENTIAL = (' + @credential + ')'
-- default : display results
IF @display IS NULL
set @display = 1
-- default : debug is off
IF @debug IS NULL
SET @debug = 0
-- if the datetime is null, get the latest version
IF (@dt IS NULL)
set @dt = getdate()
-- number of milliseconds since 1970
-- Datediff doesn't return a bigint - so I need todo this into 2 parts
set @ts = datediff( s, '1970/1/1', convert(date,@dt) )
set @ts = @ts * 1000;
set @tsmil = datediff(ms, convert(date,@dt) , @dt)
set @ts = @ts + @tsmil;
if (@dest_table IS NULL)
set @dest_table = '#tmp_output'
---- Holds the raw json information -----------------------------
IF OBJECT_ID('tempdb..#last_checkpoint') IS NOT NULL
DROP TABLE #last_checkpoint
create table #last_checkpoint
(
info varchar(max)
) with (distribution=round_robin, heap)
------ read _last_checkpoint
-- This gives us the checkpoint if it exists, this gives us the starting point
-- for the json files
set @tsql = '
copy into #last_checkpoint (info)
from ''' + @folder + '_delta_log/_last_checkpoint''
with ( ' + @credential + ' ,FILE_TYPE = ''CSV''
,fieldterminator =''0x0b'' ,fieldquote = ''0x0b'' ,rowterminator = ''0x0d'' ) '
if @debug = 1
print @tsql;
set @td_checkpoint1 = getdate();
exec(@tsql);
set @td_checkpoint2 = getdate();
print 'Loading _last_checkpoint files took ' + convert(varchar(50),datediff(ms, @td_checkpoint1,@td_checkpoint2)) + ' ms'
if @debug = 1
begin
select '_Last_checkpoint', * from #last_checkpoint
end
select @checkpointversion = ISNULL(JSON_VALUE(info,'$.version'),'00') from #last_checkpoint;
if NOT EXISTS(SELECT * FROM #last_checkpoint)
BEGIN
SET @checkpointversion = '00'
END
PRINT 'checkpointversion=' +@checkpointversion
set @jsonversionchar = '00000000000000000000';
IF OBJECT_ID('tempdb..#delta_checkpoint') IS NOT NULL
DROP TABLE #delta_checkpoint
create table #delta_checkpoint
(
txn varchar(max),
[add] varchar(max),
[remove] varchar(max),
metaData varchar(max),
protocol varchar(max)
) with (distribution=round_robin, heap)
--- Code to pull the the add / remove files from the checkpoint files.
set @tsql_checkpoint = '
copy into #delta_checkpoint from ''' + @folder + '_delta_log/*.checkpoint.parquet''
with ( ' + @credential + ' ,FILE_TYPE = ''PARQUET'', AUTO_CREATE_TABLE = ''ON'' ) '
if @debug = 1
BEGIN
print @tsql_checkpoint;
END
set @td_checkpoint1 = getdate()
exec(@tsql_checkpoint);
set @td_checkpoint2 = getdate()
print 'Loading checkpoint files took ' + convert(varchar(50),datediff(ms, @td_checkpoint1,@td_checkpoint2)) + ' ms'
SELECT @checkpoint_file_count = count(*) from #delta_checkpoint
if @debug = 1
BEGIN
print 'No of checkpoint files ' + convert(varchar(10),@checkpoint_file_count);
SELECT * FROM #delta_checkpoint order by [add],[remove]
END
-- Holds the information from the checkpoint files ---------
IF OBJECT_ID('tempdb..#delta_files_checkpoint') IS NOT NULL
BEGIN;
print 'Dropping table #delta_files_checkpoint'
DROP TABLE #delta_files_checkpoint
END
create table #delta_files_checkpoint
(
fname varchar(99),
ts bigint,
dt datetime2,
[action] char(1)
) with (distribution=round_robin, heap)
-- create a table of add/remove files with the datetimestamp
INSERT INTO #delta_files_checkpoint
select DISTINCT
CASE WHEN JSON_VALUE([add],'$.path') is NULL THEN
JSON_VALUE([remove],'$.path')
ELSE
JSON_VALUE([add],'$.path')
END fname,
CASE
WHEN JSON_VALUE([add],'$.path') is NULL THEN
convert(bigint,JSON_VALUE([remove],'$.deletionTimestamp'))
ELSE
convert(bigint,JSON_VALUE([add],'$.modificationTime'))
END updatetime,
CASE
WHEN JSON_VALUE([add],'$.path') is NULL THEN
dateadd(s, convert(bigint,JSON_VALUE([remove],'$.deletionTimestamp')) / 1000 , '1970/1/1')
ELSE
dateadd(s, convert(bigint,JSON_VALUE([add],'$.modificationTime')) / 1000 , '1970/1/1')
END updatetimedt,
CASE WHEN JSON_VALUE([add],'$.path') is NULL THEN
'R'
ELSE
'A'
END [action]
from #delta_checkpoint
DELETE from #delta_files_checkpoint where fname is null
if @debug = 1
SELECT 'checkpoint', * , @ts, @dt FROM #delta_files_checkpoint order by dt desc
-- remove files after the given time
DELETE FROM #delta_files_checkpoint WHERE ts > @ts
if @debug = 1
SELECT 'checkpoint filtered', * , @ts, @dt FROM #delta_files_checkpoint order by dt desc
---- Holds the raw json information -----------------------------
IF OBJECT_ID('tempdb..#delta_json') IS NOT NULL
BEGIN;
DROP TABLE #delta_json
END
create table #delta_json
(
jsoninput varchar(max)
) with (distribution=round_robin, heap)
----------------------------------------------------------------
set @jsonversionchar= left(substring(@jsonversionchar,0,len(@jsonversionchar)-len(@checkpointversion)+1) + @checkpointversion,len(@jsonversionchar)-1) + '*'
-- Read all the delta transaction logs, we cant filter out things by file name.
set @tsql = '
copy into #delta_json (jsoninput)
from ''' + @folder + '_delta_log/'+ @jsonversionchar +'.json''
with ( ' + @credential + ' ,FILE_TYPE = ''CSV''
,fieldterminator =''0x0b'' ,fieldquote = ''0x0b'' ,rowterminator = ''0x0d'' ) '
if @debug = 1
print @tsql;
set @td_checkpoint1 = getdate();
exec(@tsql);
set @td_checkpoint2 = getdate();
print 'Loading json files took ' + convert(varchar(50),datediff(ms, @td_checkpoint1,@td_checkpoint2)) + ' ms'
if @debug = 1
SELECT 'JSON Files', * , @ts, @dt FROM #delta_json
---- delta file details..
IF OBJECT_ID('tempdb..#delta_active_json') IS NOT NULL
BEGIN;
DROP TABLE #delta_active_json
END
create table #delta_active_json
(
jsoninput varchar(max) ,
ts bigint,
dt datetime2,
[action] char(1)
) with (distribution=round_robin, heap)
-----------------------------------------------------------
-- inserting into temp table, so the date and time is associated to each row
insert into #delta_active_json
select
convert(varchar(max),jsoninput)
, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) as ts
, dateadd(s, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) / 1000 , '1970/1/1')
,''
from #delta_json
WHERE convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) < @ts
if @debug = 1
select 'json filtered by date',*, @ts, @dt from #delta_active_json order by dt desc
-- left in for debugging -- shows each version of the JSON
if @debug = 1
begin
select 'not filtered',
convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) as ts
, convert(varchar(8000),jsoninput)
, dateadd(s, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) / 1000 , '1970/1/1')
,ROW_NUMBER() OVER(PARTITION BY NULL ORDER BY JSON_VALUE(jsoninput,'$.commitInfo.timestamp') DESC) AS "Row Number"
,convert(varchar(8000),jsoninput)
, @ts, @dt
from #delta_json
order by convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) desc
end
---------------------------------------------------------
-- insert the JSON adds and removes to the existing list from checkpoint
insert into #delta_files_checkpoint
select substring(value,charindex('path',value)+7 ,67) [path]
, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) ts
, dateadd(s, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) / 1000 , '1970/1/1') td
,'A' as [action]
from #delta_active_json
cross apply STRING_SPLIT(jsoninput,',')
where value like '%path%' and charindex('add',value) > 0
union all
select substring(value,charindex('path',value)+7 ,67)
, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) ts
, dateadd(s, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) / 1000 , '1970/1/1') td
, 'R'
from #delta_active_json
cross apply STRING_SPLIT(jsoninput,',')
where value like '%path%' and charindex('remove',value) > 0
if @debug = 1
begin
-- all the adds and removes from the json
select 'adds', substring(value,charindex('path',value)+7 ,67) [path]
, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) ts
, dateadd(s, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) / 1000 , '1970/1/1') td
,'A' as [action] , @ts, @dt
from #delta_active_json
cross apply STRING_SPLIT(jsoninput,',')
where value like '%path%' and charindex('add',value) > 0
select 'removes', substring(value,charindex('path',value)+7 ,67)
, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) ts
, dateadd(s, convert(bigint,JSON_VALUE(jsoninput,'$.commitInfo.timestamp')) / 1000 , '1970/1/1') td
, 'R', @ts, @dt
from #delta_active_json
cross apply STRING_SPLIT(jsoninput,',')
where value like '%path%' and charindex('remove',value) > 0
select 'New list with adds and removes', *, @ts, @dt from #delta_files_checkpoint order by ts desc;
select distinct 'distinct list ', fname, dt, [action], @ts, @dt from #delta_files_checkpoint order by dt desc;
end
---- Holds the raw json information -----------------------------
IF OBJECT_ID('tempdb..#delta_files') IS NOT NULL
BEGIN;
DROP TABLE #delta_files
END
create table #delta_files
(
filenames varchar(max), rownumber bigint
) with (distribution=round_robin, heap)
insert into #delta_files
select fname,
ROW_NUMBER() OVER(PARTITION BY NULL ORDER BY fname DESC)
from #delta_files_checkpoint where [action] = 'A' and
fname not in (select fname from #delta_files_checkpoint where [action] = 'R')
if @debug = 1
select '#delta_files', * from #delta_files;
if @debug = 1
select 'parquet to read ', fname,ts, @ts, dt, @dt from #delta_files_checkpoint where [action] = 'A' and
fname not in (select fname from #delta_files_checkpoint where [action] = 'R')
--where ts < @ts
--- This batching code might appear over kill, but STRING_AGG falls over when the string
-- is too long.. so I am batching up the inserts.
DECLARE @flist varchar(max);
DECLARE @batchsize int = 100000;
DECLARE @iParquetFileCount int =0;
DECLARE @iParquetFilePos int =1;
select @iParquetFileCount = count(*) from #delta_files
IF OBJECT_ID('tempdb..#_files') IS NOT NULL
DROP TABLE #_files
create table #_files
( fname varchar(999) ) with (distribution=round_robin,heap)
---- creating batches of COPY INTO statements
while(@iParquetFilePos <= @iParquetFileCount)
BEGIN
INSERT INTO #_files
SELECT [filenames] FROM #delta_files where rownumber between @iParquetFilePos and @iParquetFilePos + @batchsize
-- Had issues with STRING_AGG with if the string gets too long
SELECT @flist = 'COPY INTO ' + @dest_table + ' FROM '
+ STRING_AGG(CAST(''''+@folder+fname+'''' AS VARCHAR(MAX)), ',
') + ' with ( ' + @credential + '
,FILE_TYPE = ''PARQUET'' , AUTO_CREATE_TABLE = ''ON'' )'
FROM #_files
if @debug =1
select '_filelist', * from #_files;
if @debug =1 print @flist;
if @debug =1 print len(@flist);
set @td_checkpoint1 = getdate();
exec(@flist);
set @td_checkpoint2 = getdate();
print 'Loading batch of parquet ' + convert(varchar(10),@iParquetFilePos) + '->' + convert(varchar(10), @iParquetFilePos + @batchsize ) + ' took ' + convert(varchar(50),datediff(ms, @td_checkpoint1,@td_checkpoint2)) + ' ms'
truncate table #_files;
set @iParquetFilePos = @iParquetFilePos + @batchsize +1;
END
-- if there is no file, there is no table and this causes a problem
if charindex('#',@dest_table) > 0
set @output_sql = 'IF OBJECT_ID(''tempdb..' + @dest_table + ''') IS NOT NULL SELECT * from ' + @dest_table + ';'
ELSE
set @output_sql = 'IF OBJECT_ID(''' + @dest_table + ''') IS NOT NULL SELECT * from ' + @dest_table + ';'
if @debug = 1
PRINT @output_sql;
if @display = 1
exec(@output_sql );
end
@MarkPryceMaherMSFT
Copy link
Author

Updated to add remove files... Also the performance is terrible!!!!

@MarkPryceMaherMSFT
Copy link
Author

I was 'inspired' by Jovan - the credential handling is much simpler and more generic. Also I added STRING_AGG when generating the final copy into statement

@MarkPryceMaherMSFT
Copy link
Author

Added some performance improvements by taking out all the nasty string handling

@MarkPryceMaherMSFT
Copy link
Author

Thanks Daniel - He spotted a problem with temp table

@MarkPryceMaherMSFT
Copy link
Author

Thanks to Rodrigo for the help understanding the checkpoints.
Thanks to Luke for the ideas.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment