Skip to content

Instantly share code, notes, and snippets.

@automine
Last active May 23, 2024 16:23
Show Gist options
  • Save automine/2b079e166d706471ca70560a51c8cf38 to your computer and use it in GitHub Desktop.
Save automine/2b079e166d706471ca70560a51c8cf38 to your computer and use it in GitHub Desktop.

Masa Diagram in Mermaid

flowchart TB
    

    subgraph tail
        TailReader("TailReader (tailing)")
    end
    
    TailReader -->  aeq
    TailReader --> |"if needs to read > min_batch_size_bytes in limits.conf"| batchR
    aeq(["aeq/aq"]) -->  archiveProcessor

    subgraph Batch
        batchR(BatchReader)
    end

    subgraph archivepipe
        archiveProcessor(archiveProcessor)
    end
 
    subgraph exec
        ex(exec)
    end

    hecq([httpinputq]) --> HttpInputServer
    subgraph HEC
        HttpInputServer
    end

    udp_queue([udp_queue]) --> udpIn
    subgraph udp
        udpIn(udp)
    end
    
    tcp_queue([tcpin_queue]) --> tcpIn
    subgraph tcp
        tcpIn(tcp)
    end

    subgraph fifo
        fifoIn(fifo)
    end

    subgraph fschangemanager
        fscIn(fschangemanager)
    end
    
    ex -. For WMI Input .-> winParsing
    winParsing([WinParsingQueue]) --> winUft8
    stashParsing([stashhparsing]) --> stashUft8

    fschangemanager --> parsingQueue
    batchR --> parsingQueue
    archiveProcessor --> parsingQueue
    ex --> parsingQueue
    HttpInputServer-- services/collector/event -->parsingQueue
    udpIn --> parsingQueue
    tcpIn --> parsingQueue
    fifoIn --> parsingQueue
    parsingQueue([parsingQueue]) --> parsingUft8

    subgraph stashparsing
        stashUft8[utf8] --> stashLB[linebreaker] --> stashHead[header] --> stashAgg[aggregator] --> stashRegex[regexreplacement]
    end

    subgraph winparsing
        winUft8[utf8] --> winLB[linebreaker] --> winHead[header] --> winAgg[aggregator]
    end

    subgraph Parsing
        parsingUft8[utf8] --> parseLB[linebreaker] --> parseMetrics[metrics] --> parseHead[header]
    end

    parseHead --> aggQueue
    aggQueue([aggQueue]) --> mergingAgg

    subgraph Merging
        mergingAgg[aggregator]
    end

    stashRegex --> typingQueue
    mergingAgg --> typingQueue
    winAgg --> typingQueue
    typingQueue([typingQueue]) --> typeRegex

    subgraph Typing
        typeRegex[regexreplacement] --> typeMetrics[metricschema] --> typeAnn[annotator]
    end

    auditTrail("AuditTrailManager (audittrail events)") --> auditqueue([auditqueue])
    
    typeAnn --> indexQueue
    typeAnn --> nullQ
    HttpInputServer-- services/collector/raw -->indexQueue
    auditqueue --> indexQueue
        fschangemanager -.-> indexQueue

    indexQueue([indexQueue]) --> tcpO

    subgraph indexerPipe
        tcpO(tcp-output-generic-processor) --> syslog(syslog-output-generic-processor) --> idxF(indexandforward) --> signing(signing) --> idx(indexer) --> idxT(indexer_thruput)
    end

    tcpO --> tcpout
    tcpout([tcpout_queue per group])

    idx --> id1
    id1[(Database Datastore)]

    syslog --> udpOut
    udpOut(udp out)

    nullQ(nullQueue) --> nullQueue

    subgraph dev-null
        nullQueue
    end
Loading
@sowings
Copy link

sowings commented Jun 23, 2022

Does this cover where things from the UF come from, and inputs.conf [splunktcp] route ?

@automine
Copy link
Author

@sowings I updated the batch part, good call. Right now this just covers what is in the existing Masa diagram for indexers, which does not have updates for IA. Next step would be the UF --> IDX and then trying to get info on the IA stuff for 9.

@VatsalJagani
Copy link

@automine - There are two endpoints for HEC. One endpoint (http/collector/raw) currently represents that goes through parsing and there is a main HEC endpoint (http/collector) that bypasses the parsing queue.

@VatsalJagani
Copy link

@automine - I think nullQueue happens after TypingQueue, isn't it?

@VatsalJagani
Copy link

Here is the updated code with the above two changes:

flowchart TB
    

    subgraph tail
        TailReader("TailReader (tailing)")
    end
    
    TailReader -->  aeq
    TailReader --> |"if needs to read > min_batch_size_bytes in limits.conf"| batchR
    aeq(["aeq/aq"]) -->  archiveProcessor

    subgraph Batch
        batchR(BatchReader)
    end

    subgraph archivepipe
        archiveProcessor(archiveProcessor)
    end
 
    subgraph exec
        ex(exec)
    end

    hecq([httpinputq]) --> HttpInputServer
    subgraph HEC
        HttpInputServer
    end

    udp_queue([udp_queue]) --> udpIn
    subgraph udp
        udpIn(udp)
    end
    
    tcp_queue([tcpin_queue]) --> tcpIn
    subgraph tcp
        tcpIn(tcp)
    end

    subgraph fifo
        fifoIn(fifo)
    end

    subgraph fschangemanager
        fscIn(fschangemanager)
    end
    
    ex -. For WMI Input .-> winParsing
    winParsing([WinParsingQueue]) --> winUft8
    stashParsing([stashhparsing]) --> stashUft8

    fschangemanager --> parsingQueue
    batchR --> parsingQueue
    archiveProcessor --> parsingQueue
    ex --> parsingQueue
    HttpInputServer-- services/collector/event -->parsingQueue
    udpIn --> parsingQueue
    tcpIn --> parsingQueue
    fifoIn --> parsingQueue
    parsingQueue([parsingQueue]) --> parsingUft8

    subgraph stashparsing
        stashUft8[utf8] --> stashLB[linebreaker] --> stashHead[header] --> stashAgg[aggregator] --> stashRegex[regexreplacement]
    end

    subgraph winparsing
        winUft8[utf8] --> winLB[linebreaker] --> winHead[header] --> winAgg[aggregator]
    end

    subgraph Parsing
        parsingUft8[utf8] --> parseLB[linebreaker] --> parseMetrics[metrics] --> parseHead[header]
    end

    parseHead --> aggQueue
    aggQueue([aggQueue]) --> mergingAgg

    subgraph Merging
        mergingAgg[aggregator]
    end

    stashRegex --> typingQueue
    mergingAgg --> typingQueue
    winAgg --> typingQueue
    typingQueue([typingQueue]) --> typeRegex

    subgraph Typing
        typeRegex[regexreplacement] --> typeMetrics[metricschema] --> typeAnn[annotator]
    end

    auditTrail("AuditTrailManager (audittrail events)") --> auditqueue([auditqueue])
    
    typeAnn --> indexQueue
    typeAnn --> nullQ
    HttpInputServer-- services/collector/raw -->indexQueue
    auditqueue --> indexQueue
        fschangemanager -.-> indexQueue

    indexQueue([indexQueue]) --> tcpO

    subgraph indexerPipe
        tcpO(tcp-output-generic-processor) --> syslog(syslog-output-generic-processor) --> idxF(indexandforward) --> signing(signing) --> idx(indexer) --> idxT(indexer_thruput)
    end

    tcpO --> tcpout
    tcpout([tcpout_queue per group])

    idx --> id1
    id1[(Database Datastore)]

    syslog --> udpOut
    udpOut(udp out)

    nullQ(nullQueue) --> nullQueue

    subgraph dev-null
        nullQueue
    end

@automine
Copy link
Author

Finanally got around to updating this. Thanks @VatsalJagani !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment