Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
CREATE STREAM WINLOGBEAT_STREAM (source_name VARCHAR, type VARCHAR, task VARCHAR, log_name VARCHAR, computer_name VARCHAR, event_data STRUCT< UtcTime VARCHAR, ProcessGuid VARCHAR, ProcessId INTEGER, Image VARCHAR, FileVersion VARCHAR, Description VARCHAR, Product VARCHAR, Company VARCHAR, CommandLine VARCHAR, CurrentDirectory VARCHAR, User VARCHAR, LogonGuid VARCHAR, LogonId VARCHAR, TerminalSessionId INTEGER, IntegrityLevel VARCHAR, Hashes VARCHAR, ParentProcessGuid VARCHAR, ParentProcessId INTEGER, ParentImage VARCHAR, ParentCommandLine VARCHAR, Protocol VARCHAR, Initiated VARCHAR, SourceIsIpv6 VARCHAR, SourceIp VARCHAR, SourceHostname VARCHAR, SourcePort INTEGER, SourcePortName VARCHAR, DestinationIsIpv6 VARCHAR, DestinationIp VARCHAR, DestinationHostname VARCHAR, DestinationPort INTEGER, DestinationPortName VARCHAR>, event_id INTEGER) WITH (KAFKA_TOPIC='winlogbeat', VALUE_FORMAT='JSON');
CREATE STREAM WINLOGBEAT_STREAM_REKEY WITH (VALUE_FORMAT='JSON', PARTITIONS=1, TIMESTAMP='event_date_creation') AS SELECT STRINGTOTIMESTAMP(event_data->UtcTime, 'yyyy-MM-dd HH:mm:ss.SSS') AS event_date_creation, event_data->ProcessGuid AS process_guid, event_data->ProcessId AS process_id, event_data->Image AS process_path, event_data->FileVersion AS file_version, event_data->Description AS file_description, event_data->Company AS file_company, event_data->CommandLine AS process_command_line, event_data->CurrentDirectory AS process_current_directory, event_data->User AS user_account, event_data->LogonGuid AS user_logon_guid, event_data->LogonId AS user_logon_id, event_data->TerminalSessionId AS user_session_id, event_data->IntegrityLevel AS process_integrity_level, event_data->Hashes AS hashes, event_data->ParentProcessGuid AS parent_process_guid,event_data->ParentProcessId AS parent_process_id,event_data->ParentImage AS parent_process_path,event_data->ParentCommandLine AS parent_process_command_line,event_data->Protocol AS network_protocol,event_data->Initiated AS network_connection_initiated,event_data->SourceIsIpv6 AS src_is_ipv6,event_data->SourceIp AS src_ip_addr,event_data->SourceHostname AS src_host_name,event_data->SourcePort AS src_port,event_data->SourcePortName AS src_port_name,event_data->DestinationIsIpv6 AS dst_is_ipv6,event_data->DestinationIp AS dst_ip_addr,event_data->DestinationHostname AS dst_host_name,event_data->DestinationPort AS dst_port,event_data->DestinationPortName AS dst_port_name,event_id,source_name,log_name FROM WINLOGBEAT_STREAM WHERE source_name='Microsoft-Windows-Sysmon' PARTITION BY process_guid;
CREATE STREAM SYSMON_PROCESS_CREATE WITH (VALUE_FORMAT='JSON', PARTITIONS=1, TIMESTAMP='event_date_creation')AS SELECT event_date_creation,process_guid,process_id,process_path,file_version,file_description,file_company,process_command_line,process_current_directory,user_account,user_logon_guid,user_logon_id,user_session_id,process_integrity_level,hashes,parent_process_guid,parent_process_id,parent_process_path,parent_process_command_line,event_id,source_name,log_name FROM WINLOGBEAT_STREAM_REKEY WHERE event_id=1;
CREATE STREAM SYSMON_NETWORK_CONNECT WITH (VALUE_FORMAT='JSON', PARTITIONS=1, TIMESTAMP='event_date_creation')AS SELECT event_date_creation,process_guid,process_id,process_path,user_account,network_protocol,network_connection_initiated,src_is_ipv6,src_ip_addr,src_host_name,src_port,src_port_name,dst_is_ipv6,dst_ip_addr,dst_host_name,dst_port,dst_port_name,event_id,source_name,log_name FROM WINLOGBEAT_STREAM_REKEY WHERE event_id=3;
CREATE TABLE SYSMON_PROCESS_CREATE_TABLE (event_date_creation VARCHAR,process_guid VARCHAR,process_id INTEGER,process_path VARCHAR,file_version VARCHAR,file_description VARCHAR,file_company VARCHAR,process_command_line VARCHAR,process_current_directory VARCHAR,user_account VARCHAR,user_logon_guid VARCHAR,user_logon_id VARCHAR,user_session_id INTEGER,process_integrity_level VARCHAR,hashes VARCHAR,parent_process_guid VARCHAR,parent_process_id INTEGER,parent_process_path VARCHAR,parent_process_command_line VARCHAR,event_id INTEGER,source_name VARCHAR,log_name VARCHAR) WITH (KAFKA_TOPIC='SYSMON_PROCESS_CREATE', VALUE_FORMAT='JSON', KEY='process_guid');
CREATE STREAM SYSMON_JOIN WITH (PARTITIONS=1) AS SELECT N.EVENT_DATE_CREATION, N.PROCESS_GUID, N.PROCESS_ID, N.PROCESS_PATH, N.USER_ACCOUNT,N.NETWORK_PROTOCOL, N.NETWORK_CONNECTION_INITIATED, N.SRC_IS_IPV6, N.SRC_IP_ADDR,N.SRC_HOST_NAME, N.SRC_PORT, N.SRC_PORT_NAME, N.DST_IS_IPV6, N.DST_IP_ADDR, N.DST_HOST_NAME,N.DST_PORT, N.DST_PORT_NAME, N.SOURCE_NAME, N.LOG_NAME,P.PROCESS_COMMAND_LINE, P.HASHES, P.PARENT_PROCESS_PATH, P.PARENT_PROCESS_COMMAND_LINE,P.USER_LOGON_GUID, P.USER_LOGON_ID, P.USER_SESSION_ID, P.PROCESS_CURRENT_DIRECTORY,P.PROCESS_INTEGRITY_LEVEL, P.PARENT_PROCESS_GUID, P.PARENT_PROCESS_ID FROM SYSMON_NETWORK_CONNECT N INNER JOIN SYSMON_PROCESS_CREATE_TABLE P ON N.PROCESS_GUID = P.PROCESS_GUID;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.