Skip to content

Instantly share code, notes, and snippets.

@in2bi
Last active July 4, 2016 03:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save in2bi/9aa7a65753b4def78d75 to your computer and use it in GitHub Desktop.
Save in2bi/9aa7a65753b4def78d75 to your computer and use it in GitHub Desktop.
Biml file toe create Incremental load
<Biml xmlns="http://schemas.varigence.com/biml.xsd">
<Connections>
<AdoNetConnection Name="Source"
ConnectionString="Data Source=.;Initial Catalog=CDCTest;Integrated Security=True;"
Provider="System.Data.SqlClient.SqlConnection, System.Data, Version=2.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089"
CreateInProject="true"
/>
<AdoNetConnection Name="Destination"
ConnectionString="Data Source=.;Initial Catalog=CDCTest;Integrated Security=True;"
Provider="System.Data.SqlClient.SqlConnection, System.Data, Version=2.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089"
CreateInProject="true"
/>
</Connections>
<Packages>
<Package Name="Incremental Load" ConstraintMode="Linear" ProtectionLevel="EncryptSensitiveWithUserKey">
<Variables>
<Variable Name="CDC_State" DataType="String"></Variable>
</Variables>
<Tasks>
<CustomTask Name="CDC Get processing range"
CreationName="Attunity.SqlServer.CDCControlTask.CdcControlTask, Attunity.SqlServer.CDCControlTask, Version=1.0.0.0, Culture=neutral, PublicKeyToken=aa342389a732e31c">
<ObjectData>
<![CDATA[<CDCControlTask
Connection="Source"
TaskOperation="GetProcessingRange"
OperationParameter=""
StateConnection="Destination"
StateVariable="User::CDC_State"
AutomaticStatePersistence="True"
StateName="CDC_State"
StateTable="[dbo].[cdc_states]"
CommandTimeout="30"
ChangeDetectionRetryInterval="10"
ChangeDetectionTimeout="60" />]]>
</ObjectData>
</CustomTask>
<Container Name="SEQ DepartmentGroup" ConstraintMode="Linear">
<Tasks>
<ExecuteSQL Name="SQL Create staging tables" ConnectionName="Destination">
<DirectInput><![CDATA[
IF NOT EXISTS (SELECT * FROM sys.objects WHERE name='stg_DimDepartmentGroup_DELETES')
SELECT TOP 0 *
INTO stg_DimDepartmentGroup_DELETES
FROM DimDepartmentGroup_Destination
IF NOT EXISTS (SELECT * FROM sys.objects WHERE name='stg_DimDepartmentGroup_UPDATES')
SELECT TOP 0 *
INTO stg_DimDepartmentGroup_UPDATES
FROM DimDepartmentGroup_Destination
]]></DirectInput>
</ExecuteSQL>
<Dataflow Name="DFT Incremental load">
<Transformations>
<CustomComponent Name="CDCSource"
ComponentClassId="{874F7595-FB5F-40FF-96AF-FBFF8250E3EF}"
ComponentTypeName="Attunity.SqlServer.CDCSrc.CdcSourceComponent, Attunity.SqlServer.CDCSrc, Version=1.0.0.0, Culture=neutral, PublicKeyToken=aa342389a732e31c"
ContactInfo="Attunity Ltd.; All Rights Reserved; http://www.attunity.com;"
UsesDispositions="true"
Version="2"
ValidateExternalMetadata="false">
<Annotations>
<Annotation AnnotationType="Description">Specifies the content of the metadata columns and the rows returned. Modes starting with ‘All’ return all changes and modes starting with ‘Net’ return net changes.</Annotation>
</Annotations>
<Connections>
<Connection Name="Connection" ConnectionName="Source" />
</Connections>
<CustomProperties>
<CustomProperty Name="StateVariable"
DataType="String"
TypeConverter="Attunity.SqlServer.CDCSrc.PackageVariablesTypeConverter, Attunity.SqlServer.CDCSrc, Version=1.0.0.0, Culture=neutral, PublicKeyToken=aa342389a732e31c"
SupportsExpression="true"
Description="An SSIS string package variable to store the CDC state of the current CDC state."
>User::CDC_State</CustomProperty>
<CustomProperty Name="CDCProcessingMode"
DataType="Int32"
TypeConverter="Attunity.SqlServer.CDCSrc.CdcProcessingMode, Attunity.SqlServer.CDCSrc, Version=1.0.0.0, Culture=neutral, PublicKeyToken=aa342389a732e31c"
SupportsExpression="true"
Description="Specifies the content of the metadata columns and the rows returned. Modes starting with ‘All’ return all changes and modes starting with ‘Net’ return net changes."
>2</CustomProperty>
<CustomProperty Name="CaptureInstance"
DataType="String"
SupportsExpression="true"
Description="The name of the CDC capture instance. By default the capture instance name is in the form of &quot;&lt;owner&gt;_&lt;table&gt;&quot;. The actual table that stores the changes is the CDC table named &quot;cdc&quot;.&quot;&lt;capture-instance&gt;_CT&quot;"
>dbo_DimDepartmentGroup_CDC</CustomProperty>
<CustomProperty Name="CommandTimeout"
DataType="Int32"
SupportsExpression="true"
Description="The number of seconds before a command times out. A value of 0 indicates an infinite time out."
>30</CustomProperty>
<CustomProperty Name="ReprocessingIndication"
DataType="Boolean"
SupportsExpression="true"
Description="If true, a special output column called ‘__$reprocessing ’ is added. This output column is true for each data row during the initial processing range, or when the previous CDC run&#xA;was stopped without finishing. The default value is false (the __$reprocessing output column is not generated). This special output column allows the SSIS developer to handle consistency errors differently when working on the Initial Processing Range or the previous CDC run was stopped."
>false</CustomProperty>
</CustomProperties>
<OutputPaths>
<OutputPath Name="Output" ErrorRowDisposition="FailComponent" TruncationRowDisposition="FailComponent">
<OutputColumns>
<OutputColumn Name="__$start_lsn" ExternalMetadataColumnName="__$start_lsn" DataType="Binary" Length="10" ErrorRowDisposition="FailComponent" TruncationRowDisposition="FailComponent"/>
<OutputColumn Name="__$operation" ExternalMetadataColumnName="__$operation" DataType="Int32" ErrorRowDisposition="FailComponent" TruncationRowDisposition="FailComponent" />
<OutputColumn Name="__$update_mask" ExternalMetadataColumnName="__$update_mask" DataType="Binary" Length="128" ErrorRowDisposition="FailComponent" TruncationRowDisposition="FailComponent"/>
<OutputColumn Name="DepartmentGroupKey" ExternalMetadataColumnName="DepartmentGroupKey" ErrorRowDisposition="FailComponent" TruncationRowDisposition="FailComponent" />
<OutputColumn Name="ParentDepartmentGroupKey" ExternalMetadataColumnName="ParentDepartmentGroupKey" DataType="Int32" ErrorRowDisposition="FailComponent" TruncationRowDisposition="FailComponent" />
<OutputColumn Name="DepartmentGroupName" ExternalMetadataColumnName="DepartmentGroupName" DataType="String" Length="50" ErrorRowDisposition="FailComponent" TruncationRowDisposition="FailComponent" />
</OutputColumns>
<ExternalColumns>
<ExternalColumn Name="__$start_lsn" DataType="Binary" Length="10" />
<ExternalColumn Name="__$operation" DataType="Int32" />
<ExternalColumn Name="__$update_mask" DataType="Binary" Length="128" />
<ExternalColumn Name="DepartmentGroupKey" />
<ExternalColumn Name="ParentDepartmentGroupKey" DataType="Int32" />
<ExternalColumn Name="DepartmentGroupName" DataType="String" Length="50" />
</ExternalColumns>
</OutputPath>
<OutputPath Name="ErrorOutput" IsErrorOutput="true">
<OutputColumns>
<OutputColumn Name="__$start_lsn" DataType="Binary" Length="10" />
<OutputColumn Name="__$operation" DataType="Int32" />
<OutputColumn Name="__$update_mask" DataType="Binary" Length="128" />
<OutputColumn Name="DepartmentGroupKey" DataType="Int32" />
<OutputColumn Name="ParentDepartmentGroupKey" DataType="Int32" />
<OutputColumn Name="DepartmentGroupName" DataType="String" Length="50" />
</OutputColumns>
</OutputPath>
</OutputPaths>
</CustomComponent>
<CustomComponent Name="CDCSplitter"
ComponentClassId="{874F7595-FB5F-40FF-96AF-FBFF8250E3EF}"
ComponentTypeName="Attunity.SqlServer.CDCSplit.CdcSplitterComponent, Attunity.SqlServer.CDCSplit, Version=1.0.0.0, Culture=neutral, PublicKeyToken=aa342389a732e31c"
ContactInfo="Attunity Ltd.; All Rights Reserved; http://www.attunity.com;"
UsesDispositions="true"
Version="2"
ValidateExternalMetadata="false">
<Annotations>
<Annotation AnnotationType="Description">Directs a stream of net change records into different outputs based on the type of the change (Insert, Delete and Update). This allows specific handling for different types of change records.</Annotation>
</Annotations>
<InputPaths>
<InputPath Identifier="Input" OutputPathName="CDCSource.Output">
<InputColumns>
<InputColumn SourceColumn="__$start_lsn" />
<InputColumn SourceColumn="__$operation" />
<InputColumn SourceColumn="__$update_mask" />
<InputColumn SourceColumn="DepartmentGroupKey" />
<InputColumn SourceColumn="ParentDepartmentGroupKey" />
<InputColumn SourceColumn="DepartmentGroupName" />
</InputColumns>
</InputPath>
</InputPaths>
<OutputPaths>
<OutputPath Name="InsertOutput">
<Annotations>
<Annotation AnnotationType="Description">Output type - Insert.</Annotation>
</Annotations>
<CustomProperties>
<CustomProperty Name="OutputType" DataType="Int32"
TypeConverter="Attunity.SqlServer.CDCSplit.OutputType, Attunity.SqlServer.CDCSplit, Version=1.0.0.0, Culture=neutral, PublicKeyToken=aa342389a732e31c"
>0</CustomProperty>
</CustomProperties>
<OutputColumns>
<OutputColumn Name="__$start_lsn" DataType="Binary" Length="10" />
<OutputColumn Name="__$operation" DataType="Int32" />
<OutputColumn Name="__$update_mask" DataType="Binary" Length="128" />
<OutputColumn Name="DepartmentGroupKey" DataType="Int32" />
<OutputColumn Name="ParentDepartmentGroupKey" DataType="Int32" />
<OutputColumn Name="DepartmentGroupName" DataType="String" Length="50" />
</OutputColumns>
<ExternalColumns />
</OutputPath>
<OutputPath Name="UpdateOutput">
<Annotations>
<Annotation AnnotationType="Description">Output type - Update.</Annotation>
</Annotations>
<CustomProperties>
<CustomProperty Name="OutputType" DataType="Int32"
TypeConverter="Attunity.SqlServer.CDCSplit.OutputType, Attunity.SqlServer.CDCSplit, Version=1.0.0.0, Culture=neutral, PublicKeyToken=aa342389a732e31c"
>1</CustomProperty>
</CustomProperties>
<OutputColumns>
<OutputColumn Name="__$start_lsn" DataType="Binary" Length="10" />
<OutputColumn Name="__$operation" DataType="Int32" />
<OutputColumn Name="__$update_mask" DataType="Binary" Length="128" />
<OutputColumn Name="DepartmentGroupKey" DataType="Int32" />
<OutputColumn Name="ParentDepartmentGroupKey" DataType="Int32" />
<OutputColumn Name="DepartmentGroupName" DataType="String" Length="50" />
</OutputColumns>
</OutputPath>
<OutputPath Name="DeleteOutput">
<Annotations>
<Annotation AnnotationType="Description">Output type - Delete.</Annotation>
</Annotations>
<CustomProperties>
<CustomProperty Name="OutputType" DataType="Int32"
TypeConverter="Attunity.SqlServer.CDCSplit.OutputType, Attunity.SqlServer.CDCSplit, Version=1.0.0.0, Culture=neutral, PublicKeyToken=aa342389a732e31c"
>2</CustomProperty>
</CustomProperties>
<OutputColumns>
<OutputColumn Name="__$start_lsn" DataType="Binary" Length="10" />
<OutputColumn Name="__$operation" DataType="Int32" />
<OutputColumn Name="__$update_mask" DataType="Binary" Length="128" />
<OutputColumn Name="DepartmentGroupKey" DataType="Int32" />
<OutputColumn Name="ParentDepartmentGroupKey" DataType="Int32" />
<OutputColumn Name="DepartmentGroupName" DataType="String" Length="50" />
</OutputColumns>
</OutputPath>
<OutputPath Name="ErrorOutput" IsErrorOutput="true">
<Annotations>
<Annotation AnnotationType="Description">Output type - Error.</Annotation>
</Annotations>
<CustomProperties>
<CustomProperty Name="OutputType" DataType="Int32"
TypeConverter="Attunity.SqlServer.CDCSplit.OutputType, Attunity.SqlServer.CDCSplit, Version=1.0.0.0, Culture=neutral, PublicKeyToken=aa342389a732e31c"
>3</CustomProperty>
</CustomProperties>
<OutputColumns>
<OutputColumn Name="__$start_lsn" DataType="Binary" Length="10" />
<OutputColumn Name="__$operation" DataType="Int32" />
<OutputColumn Name="__$update_mask" DataType="Binary" Length="128" />
<OutputColumn Name="DepartmentGroupKey" DataType="Int32" />
<OutputColumn Name="ParentDepartmentGroupKey" DataType="Int32" />
<OutputColumn Name="DepartmentGroupName" DataType="String" Length="50" />
</OutputColumns>
<ExternalColumns />
</OutputPath>
</OutputPaths>
</CustomComponent>
<AdoNetDestination Name="ADO_DST Destination" ConnectionName="Destination">
<InputPath OutputPathName="CDCSplitter.InsertOutput" />
<ExternalTableOutput Table="[dbo].[DimDepartmentGroup_Destination]" />
</AdoNetDestination>
<AdoNetDestination Name="ADO_DST Staging UPDATES" ConnectionName="Destination">
<InputPath OutputPathName="CDCSplitter.UpdateOutput" />
<ExternalTableOutput Table="[dbo].[stg_DimDepartmentGroup_UPDATES]" />
</AdoNetDestination>
<AdoNetDestination Name="ADO_DST Staging DELETES" ConnectionName="Destination">
<InputPath OutputPathName="CDCSplitter.DeleteOutput" />
<ExternalTableOutput Table="[dbo].[stg_DimDepartmentGroup_UPDATES]" />
</AdoNetDestination>
</Transformations>
</Dataflow>
<ExecuteSQL Name="SQL Handle updates in batch" ConnectionName="Destination">
<DirectInput><![CDATA[
UPDATE dest
SET
dest.ParentDepartmentGroupKey = stg.ParentDepartmentGroupKey
des.DepartmentGroupName = stg.DepartmentGroupName
FROM
DimDepartmentGroup_Destination dest,
stg_DimDepartmentGroup_UPDATES stg
WHERE
dest.DepartmentGroupKey = stg.DepartmentGroupKey
]]></DirectInput>
</ExecuteSQL>
<ExecuteSQL Name="SQL Handle deletes in batch" ConnectionName="Destination">
<DirectInput><![CDATA[
DELETE FROM DimDepartmentGroup_Destination
WHERE DepartmentGroupKey IN
(
SELECT DepartmentGroupKey
FROM stg_DimDepartmentGroup_UPDATES
)
]]></DirectInput>
</ExecuteSQL>
<ExecuteSQL Name="SQL Drop staging tables" ConnectionName="Destination">
<DirectInput><![CDATA[
DROP TABLE stg_DimDepartmentGroup_UPDATES
DROP TABLE stg_DimDepartmentGroup_DELETES
]]></DirectInput>
</ExecuteSQL>
<!--Close Container-->
</Tasks></Container>
<CustomTask Name="CDC Mark processed range"
CreationName="Attunity.SqlServer.CDCControlTask.CdcControlTask, Attunity.SqlServer.CDCControlTask, Version=1.0.0.0, Culture=neutral, PublicKeyToken=aa342389a732e31c">
<ObjectData>
<![CDATA[<CDCControlTask
Connection="Source"
TaskOperation="MarkProcessedRange"
OperationParameter=""
StateConnection="Destination"
StateVariable="User::CDC_State"
AutomaticStatePersistence="True"
StateName="CDC_State"
StateTable="[dbo].[cdc_states]"
CommandTimeout="30"
ChangeDetectionRetryInterval="10"
ChangeDetectionTimeout="60" />]]>
</ObjectData>
</CustomTask>
</Tasks>
</Package>
</Packages>
</Biml>
@prathmanu
Copy link

Great template for newbies, I tried extending it with foreach loop for mutiple tables. I am getting Error:

0xC0047062 at DFT Incremental load_Source1, CDCSplitter [92]: System.ArgumentException: Value does not fall within the expected range.
at Microsoft.SqlServer.Dts.Pipeline.Wrapper.IDTSBuffer100.DirectRow(Int32 hRow, Int32 lOutputID)
at Attunity.SqlServer.CDCSplit.CdcSplitterComponent.ProcessInput(Int32 inputId, PipelineBuffer buffer)
at Microsoft.SqlServer.Dts.Pipeline.ManagedComponentHost.HostProcessInput(IDTSManagedComponentWrapper100 wrapper, Int32 inputID, IDTSBuffer100 pDTSBuffer, IntPtr bufferWirePacket)
Error: 0xC0047022 at DFT Incremental load_Source1, SSIS.Pipeline: SSIS Error Code DTS_E_PROCESSINPUTFAILED. The ProcessInput method on component "CDCSplitter" (92) failed with error code 0x80070057 while processing input "CDCSource_Output_CDCSplitter" (94). The identified component returned an error from the ProcessInput method. The error is specific to the component, but the error is fatal and will cause the Data Flow task to stop running. There may be error messages posted before this with more information about the failure.

My Implementation for CDCSplitter where I get this error is

              <Annotations>
                <Annotation AnnotationType="Description">Directs a stream of net change records into different outputs based on the type of the change (Insert, Delete and Update). This allows specific handling for different types of change records.</Annotation>
              </Annotations>
              <InputPaths>
                <InputPath Identifier="Input" OutputPathName="CDCSource.Output">
                  <InputColumns>
                    <InputColumn SourceColumn="__$start_lsn" />
                    <InputColumn SourceColumn="__$operation" />
                    <InputColumn SourceColumn="__$update_mask" />
                     <# foreach (var column in table.Columns) { #>
                    <InputColumn SourceColumn="<#=column.Name#>" />
                   <# } #>
                  </InputColumns>
                </InputPath>
              </InputPaths>
              <OutputPaths>
                <OutputPath Name="InsertOutput">
                  <Annotations>
                    <Annotation AnnotationType="Description">Output type - Insert.</Annotation>
                  </Annotations>
                  <CustomProperties>
                    <CustomProperty Name="OutputType" DataType="Int32"
                                    TypeConverter="Attunity.SqlServer.CDCSplit.OutputType, Attunity.SqlServer.CDCSplit, Version=1.0.0.0, Culture=neutral, PublicKeyToken=aa342389a732e31c"
                                >0</CustomProperty>

                  </CustomProperties>
                  <OutputColumns>
                    <OutputColumn Name="__$start_lsn" DataType="Binary" Length="10" />
                    <OutputColumn Name="__$operation" DataType="Int32" />
                    <OutputColumn Name="__$update_mask" DataType="Binary" Length="128" />
                    <# foreach (var column in table.Columns) { #>
                      <# if(column.DataType!= System.Data.DbType.Int32) { #>
                    <OutputColumn Name="<#=column.Name#>" DataType="<#=column.DataType#>" Length="<#=column.Length#>" />
                   <# } #>
                       <# if(column.DataType== System.Data.DbType.Int32) { #>
                    <OutputColumn Name="<#=column.Name#>" DataType="Int32" />
                   <# } #>
                     <# } #>
                  </OutputColumns>
                  <ExternalColumns /> 
                </OutputPath>

                <OutputPath Name="UpdateOutput">
                  <Annotations>
                    <Annotation AnnotationType="Description">Output type - Update.</Annotation>
                  </Annotations>
                  <CustomProperties>
                    <CustomProperty Name="OutputType" DataType="Int32"
                                    TypeConverter="Attunity.SqlServer.CDCSplit.OutputType, Attunity.SqlServer.CDCSplit, Version=1.0.0.0, Culture=neutral, PublicKeyToken=aa342389a732e31c"
                                >1</CustomProperty>
                  </CustomProperties>
                  <OutputColumns>
                    <OutputColumn Name="__$start_lsn" DataType="Binary" Length="10" />
                    <OutputColumn Name="__$operation" DataType="Int32" />
                    <OutputColumn Name="__$update_mask" DataType="Binary" Length="128" />
                     <# foreach (var column in table.Columns) { #>
                        <# if(column.DataType!= System.Data.DbType.Int32) { #>
                    <OutputColumn Name="<#=column.Name#>" DataType="<#=column.DataType#>" Length="<#=column.Length#>" />
                    <# } #>

                       <# if(column.DataType== System.Data.DbType.Int32) { #>
                    <OutputColumn Name="<#=column.Name#>" DataType="Int32"  />
                    <# } #>
                  <# } #>
                  </OutputColumns>
                </OutputPath>

                <OutputPath Name="DeleteOutput">
                  <Annotations>
                    <Annotation AnnotationType="Description">Output type - Delete.</Annotation>
                  </Annotations>
                  <CustomProperties>
                    <CustomProperty Name="OutputType" DataType="Int32"
                                    TypeConverter="Attunity.SqlServer.CDCSplit.OutputType, Attunity.SqlServer.CDCSplit, Version=1.0.0.0, Culture=neutral, PublicKeyToken=aa342389a732e31c"
                                >2</CustomProperty>
                  </CustomProperties>
                  <OutputColumns>
                     <OutputColumn Name="__$start_lsn" DataType="Binary" Length="10" />
                    <OutputColumn Name="__$operation" DataType="Int32" />
                    <OutputColumn Name="__$update_mask" DataType="Binary" Length="128" />
                    <# foreach (var column in table.Columns) { #>
                       <# if(column.DataType!= System.Data.DbType.Int32) { #>
                    <OutputColumn Name="<#=column.Name#>" DataType="<#=column.DataType#>" Length="<#=column.Length#>"/>
                    <# } #>
                       <# if(column.DataType== System.Data.DbType.Int32) { #>
                    <OutputColumn Name="<#=column.Name#>" DataType="Int32" />
                    <# } #>
                   <# } #>
                  </OutputColumns>
                </OutputPath>

                <OutputPath Name="ErrorOutput" IsErrorOutput="true">
                  <Annotations>
                    <Annotation AnnotationType="Description">Output type - Error.</Annotation>
                  </Annotations>
                  <CustomProperties>
                    <CustomProperty Name="OutputType" DataType="Int32"
                                    TypeConverter="Attunity.SqlServer.CDCSplit.OutputType, Attunity.SqlServer.CDCSplit, Version=1.0.0.0, Culture=neutral, PublicKeyToken=aa342389a732e31c"
                                >3</CustomProperty>
                  </CustomProperties>

                  <OutputColumns>
                    <OutputColumn Name="__$start_lsn" DataType="Binary" Length="10" />
                    <OutputColumn Name="__$operation" DataType="Int32" />
                    <OutputColumn Name="__$update_mask" DataType="Binary" Length="128" />
                    <# foreach (var column in table.Columns) { #>
                      <# if(column.DataType!= System.Data.DbType.Int32) { #>
                    <OutputColumn Name="<#=column.Name#>" DataType="<#=column.DataType#>" Length="<#=column.Length#>" />
                    <# } #>

                       <# if(column.DataType== System.Data.DbType.Int32) { #>
                    <OutputColumn Name="<#=column.Name#>" DataType="Int32"  />
                    <# } #>
                      <# } #>
                  </OutputColumns>
                  <ExternalColumns />
                </OutputPath>

              </OutputPaths>


            </CustomComponent>

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment