Skip to content

Instantly share code, notes, and snippets.

@santisq
Last active November 8, 2024 14:17
Show Gist options
  • Save santisq/60f206833cfd385e8a65d21a5613723b to your computer and use it in GitHub Desktop.
Save santisq/60f206833cfd385e8a65d21a5613723b to your computer and use it in GitHub Desktop.

Setup new Azure Data Explorer Table from MyAADLogs

This document details the steps needed to create a new Azure Data Explorer table for ingested logs from Azure Active Directory.

All ingested logs from AAD are written to a table in ADX named MyAADLogs, this table is overwritten over and over thus the need to create a parsing function which is used to filter the new ingested logs by their category and construct new records out of it to then write them to their corresponding tables.

Query MyAADLogs Table

First step is to query the MyAADLogs table filtering by the record.category property and expanding those properties of interest from each record. We can query the same logs using Log Analytics for comparison. For example, for NonInteractiveUserSignInLogs:

MyAADLogs
| mv-expand record = data.records
| where tostring(record.category) == 'NonInteractiveUserSignInLogs'
| extend LocationDetails = todynamic(record.properties.['location'])
| project
    TimeGenerated = todatetime(record.['time']),
    OperationName = record.['operationName'],
    OperationVersion = record.['operationVersion'],
    Category = record.['category'],
    ResultType = record.['resultType'],
    ResultSignature = record.['resultSignature'],
    ResultDescription = record.['resultDescription'],
    DurationMs = record.['durationMs'],
    CorrelationId = record.['correlationId'],
    ResourceGroup = split(record.['resourceId'], '/')[-1],
    Identity = record.['identity'],
    Location = LocationDetails['countryOrRegion'],
    AppDisplayName = record.properties.['appDisplayName'],
    AppId = record.properties.['appId'],
    AuthenticationContextClassReferences = record.properties['authenticationContextClassReferences'],
    AuthenticationDetails = record.properties['authenticationDetails'],
    AuthenticationProcessingDetails = record.properties.['authenticationProcessingDetails'],
    AuthenticationProtocol = record.properties.['authenticationProtocol'],
    AuthenticationRequirement = record.properties.['authenticationRequirement'],
    AuthenticationRequirementPolicies = record.properties.['authenticationRequirementPolicies'],
    AutonomousSystemNumber = record.properties.['autonomousSystemNumber'],
    ClientAppUsed = record.properties.['clientAppUsed'],
    CreatedDateTime = todatetime(record.properties.['createdDateTime']),
    CrossTenantAccessType = record.properties.['crossTenantAccessType'],
    DeviceDetail = record.properties.['deviceDetail'],
    HomeTenantId = record.properties['homeTenantId'],
    Id = record.properties.['id'],
    IPAddress = record.callerIpAddress,
    IsInteractive = record.properties.['isInteractive'],
    LocationDetails,
    MfaDetail = record.properties.['mfaDetail'],
    NetworkLocationDetails = record.properties.['networkLocationDetails'],
    OriginalRequestId = record.properties.['originalRequestId'],
    ProcessingTimeInMs = record.properties.['processingTimeInMilliseconds'],
    ResourceDisplayName = record.properties.['resourceDisplayName'],
    ResourceIdentity = record.properties.['resourceId'],
    ResourceServicePrincipalId = record.properties.['resourceServicePrincipalId'],
    ResourceTenantId = record.properties.['resourceTenantId'],
    RiskDetail = record.properties.['riskDetail'],
    RiskEventTypes = record.properties.['riskEventTypes'],
    RiskLevelAggregated = record.properties.['riskLevelAggregated'],
    RiskState = record.properties.['riskState'],
    SessionLifetimePolicies = record.properties.['sessionLifetimePolicies'],
    Status = record.properties.['status'],
    TokenIssuerType = record.properties.['tokenIssuerType'],
    UniqueTokenIdentifier = record.properties.['uniqueTokenIdentifier'],
    UserAgent = record.properties.['userAgent'],
    UserDisplayName = record.properties.['userDisplayName'],
    UserId = record.properties.['userId'],
    UserPrincipalName = record.properties.['userPrincipalName'],
    UserType = record.properties.['userType'],
    Type = 'AADNonInteractiveUserSignInLogs'

Create an ADX Function for the new query

Once we have the new query we can use the .create-or-alter function command to create a new Azure Data Explorer function. This command can also be used to update an existing query.

.create-or-alter function AADNonInteractiveUserSignInLogsParsing() {
    AADLogs
    | mv-expand record = data.records
    | where tostring(record.category) == 'NonInteractiveUserSignInLogs'
    | project
        ...
        ...
}

After running the control command we should see the new function being created in the Functions table. Now we can re-use this function:

AADNonInteractiveUserSignInLogsParsing
| take 1

Create the destination table

Next step is to create the table where the new logs are going to be written to. For this we can use the .set control command:

.set AADNonInteractiveUserSignInLogs <|
    AADNonInteractiveUserSignInLogsParsing()
    | take 0

The | take 0 suffix is meant to make sure the command actually appends no records to the target table. We just need to create the schema for this new table.

Update the Table Policy

Last set is to change the table update policy, here we define which query is going to be used to update the table and the source table. For this we can use the .alter table policy update command:

.alter table AADNonInteractiveUserSignInLogs policy update
```
[
    {
        "IsEnabled": true,
        "Source": "MyAADLogs",
        "Query": "AADNonInteractiveUserSignInLogsParsing",
        "IsTransactional": true,
        "PropagateIngestionProperties": false
    }
]
```
Add-Type -Path .\KustoTools\Microsoft.Azure.Kusto.Tools\net6.0\Kusto.Ingest.dll
$clusterUrl = 'https://mySourceCluster.eastus.kusto.windows.net'
$databaseName = 'myDb'
$tableName = 'myTable'
$factory = [Kusto.Ingest.KustoIngestFactory]::CreateDirectIngestClient(
[Kusto.Data.KustoConnectionStringBuilder]::new(
$clusterUrl,
$databaseName).
WithAadSystemManagedIdentity())
$stream = (Get-Item .\test.csv).OpenRead()
$ingestionProps = [Kusto.Ingest.KustoIngestionProperties]@{
IngestionMapping = [Kusto.Ingest.IngestionMapping]@{
IngestionMappingKind = [Kusto.Data.Ingestion.IngestionMappingKind]::Csv
IngestionMappings = [Kusto.Data.Common.ColumnMapping[]] @(
[User].GetProperties() | ForEach-Object {
[Kusto.Data.Common.ColumnMapping]@{
ColumnName = $_.Name
ColumnType = $_.PropertyType
}
}
)
}
DatabaseName = $databaseName
TableName = $tableName
Format = [Kusto.Data.Common.DataSourceFormat]::csv
IgnoreFirstRecord = $true
}
try {
$task = $factory.IngestFromStreamAsync($stream, $ingestionProps)
$task.GetAwaiter().GetResult()
}
finally {
${factory}?.Dispose()
}
# For more details see:
# https://stackoverflow.com/questions/77951879/how-to-ingest-inline-into-azure-data-explorer-table-from-powershell
# Nuget Package: https://www.nuget.org/packages/Microsoft.Azure.Kusto.Tools/
# - net472 = PowerShell 5.1
# - net6.0 = PowerShell 7+
$assembly = Convert-Path .\microsoft.azure.kusto.tools.12.0.1\tools\net6.0\Kusto.Data.dll
try {
[System.Reflection.Assembly]::LoadFrom($assembly)
# NOTE: `Select-Object -Skip 1` is only needed if the CSV has headers,
# headers must be skipped for Csv ingestion.
$data = [System.IO.File]::ReadLines('path\to\csvToIngest.csv') |
Select-Object -Skip 1
$clusterUrl = 'https://mySourceCluster.eastus.kusto.windows.net'
$databaseName = 'myDb'
$tableName = 'myTable'
$tenantId = 'xxxx-xxxx-xxxx-xxxx-xxxx'
# Do yourself a favor and use a Key Vault for this part :(
$adxClientId = 'xxxx-xxxx-xxxx-xxxx-xxxx'
$adxSecret = 's3cr3t'
$queryProvider = [Kusto.Data.Net.Client.KustoClientFactory]::CreateCslQueryProvider(
[Kusto.Data.KustoConnectionStringBuilder]::new($clusterUrl, $databaseName).
WithAadApplicationKeyAuthentication($adxClientId, $adxSecret, $tenantId))
$crp = [Kusto.Data.Common.ClientRequestProperties]::new()
$crp.SetOption(
[Kusto.Data.Common.ClientRequestProperties]::OptionServerTimeout,
[TimeSpan]::FromSeconds(30))
$null = $queryProvider.ExecuteControlCommand(
[string]::Format(
".ingest inline into table {0} with (format = 'csv') <| {1}",
$tableName, [string]::Join([Environment]::NewLine, $data)), $crp)
}
finally {
${queryProvider}?.Dispose()
}
$assembly = Convert-Path .\KustoTools\net472\Kusto.Data.dll
try {
[System.Reflection.Assembly]::LoadFrom($assembly)
$clusterUrl = 'https://mySourceCluster.eastus.kusto.windows.net'
$databaseName = 'myDb'
$tenantId = 'xxxx-xxxx-xxxx-xxxx-xxxx'
$adxClientId = 'xxxx-xxxx-xxxx-xxxx-xxxx'
$adxSecret = 's3cr3t'
$queryProvider = [Kusto.Data.Net.Client.KustoClientFactory]::CreateCslQueryProvider(
[Kusto.Data.KustoConnectionStringBuilder]::new($clusterUrl, $databaseName).
WithAadApplicationKeyAuthentication($adxClientId, $adxSecret, $tenantId))
$crp = [Kusto.Data.Common.ClientRequestProperties]::new()
$crp.SetOption(
[Kusto.Data.Common.ClientRequestProperties]::OptionServerTimeout,
[TimeSpan]::FromSeconds(30))
$reader = $queryProvider.ExecuteQuery('SigninLogs | take 10')
$ds = [Kusto.Cloud.Platform.Data.ExtendedDataReader]::ToDataSet($reader)
$dv = [System.Data.DataView]::new($ds.Tables[0])
$dv | Out-GridView
}
finally {
if ($queryProvider) { $queryProvider.Dispose() }
if ($reader) { $reader.Dispose() }
if ($ds) { $ds.Dispose() }
if ($dv) { $dv.Dispose() }
}

Show the retention policy for a Table

.show table MyTable policy retention
| extend Policy = parse_json(Policy)
| extend SoftDeletePeriod = Policy.SoftDeletePeriod
| extend Recoverability = Policy.Recoverability
| project-away Policy, ChildEntities

Show ingestion failures on a Cluster

.show ingestion failures 
| where FailedOn > ago(30d)

Show principals assigned at Database level

.show database MyDatabase principals

Show principals assigned at Table level

Also lists the role assignment for each principal.

.show table MyTable principals

Get the schema of a Table

MyTable
| getschema 

Ingest inline CSV data

.ingest inline into table MyTable with (format = 'csv') <|
    "this","inline","CSV","gets","ingested","to","MyTable"

Create a Table

.create table MyLogs(
    Level:string,
    Timestamp:datetime,
    UserId:string,
    TraceId:string,
    Message:string,
    ProcessId:int32)

Clear a Table

.clear table MyTable data 

Delete rows from a table based on a predicate

.delete table MyTable records <|
    MyTable
    | where // my predicate to delete rows

Grant a role assignment at Database level

  • Principals can be aadapp, aaduser, aadobject, etc.
  • ingestors is the role we're granting in the example, can also be admins, viewers, etc.
  • The syntax to add a principal 'aadapp=xxx-xxx-xxx-xxx-xxx;xxx-xxx-xxx-xxx-xxx' can be read as 'principalType=PrincipalGUID;TenantGUID'.
.add database MyDatabase ingestors (
    'aadapp=xxx-xxx-xxx-xxx-xxx;xxx-xxx-xxx-xxx-xxx')

Grant a role assignment at Table level

Very similar to previous example, with this we can grant permission to only a specific table.

10/17/2023: As of now, permissions added at Table level are not visible in the Portal UI.

.add table MyTable ingestors (
    'aadapp=xxx-xxx-xxx-xxx-xxx;xxx-xxx-xxx-xxx-xxx')

Copy a Table from one Cluster to other Cluster

.set MyDestinationTableName <|
    cluster('https://mySourceCluster.eastus.kusto.windows.net').
    database('mySourceDatabase').
    MySourceTable

Show an ADX Function Definition

.show function MyFunc
| project Body

Rename a Table Column

.rename column MyTable.MyColumn to MyNewColumnName

Delete Function

.drop function MyFunction

Delete Table

.drop table MyTable

Export to Storage Account

.export to csv ('https://connectionString.....')
    with (
        namePrefix = 'fileName',
        includeHeaders = 'all',
        distribution = 'single',
        sizeLimit = 2147483648) <|
    SigninLogs
    | where TimeGenerated < now()
	and UserPrincipalName == '....'

Get Table Details

.show tables details
| where TotalRowCount > 0
| extend TotalExtentSize = format_bytes(TotalExtentSize)
| extend TotalOriginalSize = format_bytes(TotalOriginalSize)
| extend HotExtentSize = format_bytes(HotExtentSize)
| extend HotOriginalSize = format_bytes(HotOriginalSize)
| project-away
    Folder,
    DocString,
    AuthorizedPrincipals,
    RetentionPolicy,
    CachingPolicy,
    ShardingPolicy,
    MergePolicy,
    StreamingIngestionPolicy,
    IngestionBatchingPolicy,
    RowOrderPolicy,
    TableId
# requires nuget package Microsoft.Azure.Kusto.Tools
# https://www.nuget.org/packages/Microsoft.Azure.Kusto.Tools/
using namespace System.IO
using namespace System.Data
using namespace System.Management.Automation
using namespace System.Security.Authentication
using namespace Kusto.Cloud.Platform.Data
using namespace Kusto.Data
using namespace Kusto.Data.Common
using namespace Kusto.Data.Ingestion
using namespace Kusto.Data.Net.Client
using namespace Kusto.Ingest
if ($IsCoreCLR) {
$netver = 'net6.0'
}
else {
$netver = 'net472'
}
$path = [System.IO.Path]::Combine($PSScriptRoot, 'Microsoft.Azure.Kusto.Tools', $netver)
Add-Type -Path @(
[System.IO.Path]::Combine($path, 'Kusto.Data.dll')
[System.IO.Path]::Combine($path, 'Kusto.Ingest.dll')
)
$script:_crp = [ClientRequestProperties]::new()
$script:_crp.SetOption(
[ClientRequestProperties]::OptionServerTimeout,
[TimeSpan]::FromMinutes(10))
$script:_crp.SetOption(
[ClientRequestProperties]::OptionNoTruncation,
$true)
function Connect-Kusto {
[CmdletBinding()]
param(
[Parameter(Mandatory)]
[string] $Cluster,
[Parameter(Mandatory)]
[string] $Database,
[Parameter(ParameterSetName = 'Credential', Mandatory)]
[CredentialAttribute()]
[pscredential] $Credential,
[Parameter(ParameterSetName = 'Credential', Mandatory)]
[guid] $TenantId,
[Parameter(ParameterSetName = 'Identity')]
[switch] $Identity
)
$builder = [KustoConnectionStringBuilder]::new($Cluster, $Database)
if ($Identity.IsPresent) {
$script:_builder = $builder.WithAadSystemManagedIdentity()
return
}
$script:_builder = $builder.WithAadApplicationKeyAuthentication(
$Credential.UserName,
$Credential.GetNetworkCredential().Password,
$TenantId)
}
class Assert {
static [void] Connected([PSCmdlet] $cmdlet) {
if ($script:_builder) {
return
}
$cmdlet.ThrowTerminatingError(
[ErrorRecord]::new(
[AuthenticationException]::new("Authentication needed. Please call 'Connect-Kusto'."),
'AuthRequired',
[ErrorCategory]::AuthenticationError,
$null))
}
}
function Invoke-KustoQuery {
[CmdletBinding(DefaultParameterSetName = 'ToJson')]
param(
[Parameter(Mandatory, Position = 0)]
[string] $Query,
[Parameter(ParameterSetName = 'ToEnumerable')]
[switch] $ToEnumerableObjectArray,
[Parameter(ParameterSetName = 'ToEnumerable')]
[type] $OutputType,
[Parameter(ParameterSetName = 'ToJson')]
[switch] $ToJson
)
[Assert]::Connected($PSCmdlet)
try {
$provider = [KustoClientFactory]::CreateCslQueryProvider($script:_builder)
$reader = $provider.ExecuteQuery($Query, $script:_crp)
if ($ToJson) {
return [ExtendedDataReader]::ToJsonString($reader)
}
if (-not $ToEnumerableObjectArray.IsPresent) {
$out = [ordered]@{}
$json = ConvertFrom-Json ([ExtendedDataReader]::ToJsonString($reader))
$columns = $json.Tables[0].Columns.ColumnName
foreach ($row in $json.Tables[0].Rows) {
$i = 0
foreach ($column in $columns) {
$out[$column] = $row[$i++]
}
[pscustomobject] $out
$out.Clear()
}
return
}
$enumerable = [ExtendedDataReader]::ToEnumerableObjectArray($reader, $true)
if (-not $OutputType) {
return $enumerable
}
foreach ($object in $enumerable) {
[LanguagePrimitives]::ConvertTo($object, $OutputType)
}
}
catch {
$PSCmdlet.WriteError($_)
}
finally {
${provider}?.Dispose()
if (-not $ToEnumerableObjectArray.IsPresent) {
${reader}?.Dispose()
}
}
}
function Invoke-KustoIngestFromStream {
[CmdletBinding()]
param(
[Parameter(Mandatory)]
[string] $Table,
[Parameter(Mandatory)]
[Stream] $Stream,
[Parameter()]
[ValidateNotNull()]
[Kusto.Ingest.IngestionMapping] $Mapping,
[Parameter()]
[ValidateNotNull()]
[Kusto.Data.Common.DataSourceFormat] $Format = [Kusto.Data.Common.DataSourceFormat]::csv,
[Parameter()]
[switch] $IgnoreFirstRecord,
[Parameter()]
[switch] $LeaveOpen
)
[Assert]::Connected($PSCmdlet)
try {
$ingestionProps = @{
DatabaseName = $script:_builder.InitialCatalog
TableName = $Table
Format = $Format
}
if ($Mapping) {
$ingestionProps['IngestionMapping'] = $Mapping
}
if ($IgnoreFirstRecord.IsPresent) {
$ingestionProps['IgnoreFirstRecord'] = $true
}
$factory = [KustoIngestFactory]::CreateDirectIngestClient($script:_builder)
$task = $factory.IngestFromStreamAsync($Stream, [KustoIngestionProperties] $ingestionProps)
$null = $task.GetAwaiter().GetResult()
}
catch {
$PSCmdlet.WriteError($_)
}
finally {
${factory}?.Dispose()
if (-not $LeaveOpen.IsPresent) {
${Stream}?.Dispose()
}
}
}
function Invoke-KustoIngestFromFile {
[CmdletBinding()]
param(
[Parameter(Mandatory)]
[string] $Path,
[Parameter(Mandatory)]
[string] $Table,
[Parameter()]
[ValidateNotNull()]
[Kusto.Ingest.IngestionMapping] $Mapping,
[Parameter()]
[ValidateNotNull()]
[Kusto.Data.Common.DataSourceFormat] $Format = [Kusto.Data.Common.DataSourceFormat]::csv,
[Parameter()]
[switch] $IgnoreFirstRecord
)
[Assert]::Connected($PSCmdlet)
try {
$Path = Convert-Path -LiteralPath $Path
$ingestionProps = @{
DatabaseName = $script:_builder.InitialCatalog
TableName = $Table
Format = $Format
}
if ($Mapping) {
$ingestionProps['IngestionMapping'] = $Mapping
}
if ($IgnoreFirstRecord.IsPresent) {
$ingestionProps['IgnoreFirstRecord'] = $true
}
$factory = [KustoIngestFactory]::CreateDirectIngestClient($script:_builder)
$task = $factory.IngestFromStorageAsync($Path, [KustoIngestionProperties] $ingestionProps)
$null = $task.GetAwaiter().GetResult()
}
catch {
$PSCmdlet.WriteError($_)
}
finally {
${factory}?.Dispose()
}
}
function Invoke-KustoControlCommand {
[CmdletBinding()]
param(
[Parameter(Mandatory)]
[string] $Command
)
[Assert]::Connected($PSCmdlet)
try {
$factory = [KustoClientFactory]::CreateCslAdminProvider($script:_builder)
$reader = $factory.ExecuteControlCommand($Command)
[ExtendedDataReader]::ToDataSet($reader).Tables[0]
}
catch {
$PSCmdlet.WriteError($_)
}
finally {
${factory}?.Dispose()
${reader}?.Dispose()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment