Skip to content

Instantly share code, notes, and snippets.

@OlegSchwann
Last active September 8, 2022 08:41
Show Gist options
  • Save OlegSchwann/07d4b136fd776e725dbe6a3bb06e30fa to your computer and use it in GitHub Desktop.
Save OlegSchwann/07d4b136fd776e725dbe6a3bb06e30fa to your computer and use it in GitHub Desktop.
ClickHouse: Can window view read from kafka engine?
syntax = "proto3";
package traffic_distribution;
message CampaignTraffic {
uint32 CampaignID = 1;
string RotatorName = 2;
int64 LastActionAt = 3; // timestamp
uint64 Count = 4;
}
message CampaignTrafficAggregated {
uint32 CampaignID = 1;
string RotatorName = 2;
int64 LastActionAt = 3; // timestamp
float Part = 5;
}
CREATE DATABASE IF NOT EXISTS traffic_distribution;
CREATE TABLE traffic_distribution.campaign_traffic(
CampaignID UInt32,
RotatorName String,
LastActionAt DateTime,
Count UInt64
)
-- ENGINE = MergeTree ORDER BY tuple(); -- for regular tables everything works
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'campaigns_traffic',
kafka_group_name = 'campaigns_traffic', -- TODO: read all from Kafka every time
format_schema='traffic_distribution.proto:CampaignTraffic',
kafka_format = 'Protobuf';
CREATE TABLE traffic_distribution.campaign_traffic_aggregated(
CampaignID UInt32,
RotatorName String,
LastActionAt DateTime,
Part Float32
)
-- ENGINE=MergeTree ORDER BY tuple(); -- for regular tables everything works
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'campaigns_traffic_aggregated',
kafka_group_name = 'campaigns_traffic',
format_schema='traffic_distribution.proto:CampaignTrafficAggregated',
kafka_format = 'Protobuf';
CREATE WINDOW VIEW traffic_distribution.window_view
TO traffic_distribution.campaign_traffic_aggregated
WATERMARK=ASCENDING
AS
SELECT
CampaignID,
RotatorName,
tumbleEnd(wid) AS LastActionAt,
(sum(Count) as InOneRotator) / (sum(InOneRotator) over (partition by CampaignID) as InAllRotators) as Part
FROM traffic_distribution.campaign_traffic
GROUP BY
tumble(campaign_traffic.LastActionAt, INTERVAL '5' SECOND) AS wid,
CampaignID,
RotatorName
SETTINGS allow_experimental_window_view = 1;
INSERT INTO traffic_distribution.campaign_traffic VALUES
(1, 'us', now(), 1),
(1, 'us', now(), 2),
(1, 'gb', now(), 2),
(1, 'au', now(), 3),
(1, 'nz', now(), 4),
(1, 'ca', now(), 5),
(2, 'us', now(), 6),
(2, 'us', now(), 7),
(2, 'gb', now(), 7),
(2, 'au', now(), 8),
(2, 'nz', now(), 9),
(2, 'ca', now(), 10);
2022.09.08 08:39:33.820921 [ 261 ] {} <Error> void DB::StorageKafka::threadFunc(size_t): Code: 49. DB::Exception: Block structure mismatch in function connect between CopyingDataToViewsTransform and PushingToWindowViewSink stream: different number of columns:
CampaignID UInt32 UInt32(size = 0), RotatorName String String(size = 0), LastActionAt DateTime UInt32(size = 0), Count UInt64 UInt64(size = 0), _topic LowCardinality(String) ColumnLowCardinality(size = 0, UInt8(size = 0), ColumnUnique(size = 1, String(size = 1))), _key String String(size = 0), _offset UInt64 UInt64(size = 0), _partition UInt64 UInt64(size = 0), _timestamp Nullable(DateTime) Nullable(size = 0, UInt32(size = 0), UInt8(size = 0)), _timestamp_ms Nullable(DateTime64(3)) Nullable(size = 0, DateTime64(size = 0), UInt8(size = 0)), _headers.name Array(String) Array(size = 0, UInt64(size = 0), String(size = 0)), _headers.value Array(String) Array(size = 0, UInt64(size = 0), String(size = 0))
CampaignID UInt32 UInt32(size = 0), RotatorName String String(size = 0), LastActionAt DateTime UInt32(size = 0), Count UInt64 UInt64(size = 0). (LOGICAL_ERROR), Stack trace (when copying this message, always include the lines below):
0. DB::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, int, bool) in /usr/bin/clickhouse
1. ? in /usr/bin/clickhouse
2. DB::connect(DB::OutputPort&, DB::InputPort&) in /usr/bin/clickhouse
3. DB::buildPushingToViewsChain(std::__1::shared_ptr<DB::IStorage> const&, std::__1::shared_ptr<DB::StorageInMemoryMetadata const> const&, std::__1::shared_ptr<DB::Context const>, std::__1::shared_ptr<DB::IAST> const&, bool, DB::ThreadStatus*, std::__1::atomic<unsigned long>*, DB::Block const&) in /usr/bin/clickhouse
4. DB::InterpreterInsertQuery::buildChainImpl(std::__1::shared_ptr<DB::IStorage> const&, std::__1::shared_ptr<DB::StorageInMemoryMetadata const> const&, DB::Block const&, DB::ThreadStatus*, std::__1::atomic<unsigned long>*) in /usr/bin/clickhouse
5. DB::InterpreterInsertQuery::execute() in /usr/bin/clickhouse
6. DB::StorageKafka::streamToViews() in /usr/bin/clickhouse
7. DB::StorageKafka::threadFunc(unsigned long) in /usr/bin/clickhouse
8. DB::BackgroundSchedulePoolTaskInfo::execute() in /usr/bin/clickhouse
9. DB::BackgroundSchedulePool::threadFunction() in /usr/bin/clickhouse
10. ? in /usr/bin/clickhouse
11. ThreadPoolImpl<std::__1::thread>::worker(std::__1::__list_iterator<std::__1::thread, void*>) in /usr/bin/clickhouse
12. ? in /usr/bin/clickhouse
13. ? in ?
14. __clone in ?
(version 22.8.4.7 (official build))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment