Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
database messaging distributed kafka

database, messaging, distributed system, kafka

cassandra

Background

This should give a quick overview over important features of Cassandra:

  • Has a flexible Row-Oriented Column Structure Schema
  • for structured, semi-structured and unstructured data
  • no downtime when changing schema
  • Cassandra is based on Google BigTable and Amazon Dynamo.
  • Is Peer-to-Peer distributed System
  • Uses Gossip Protocol to exchange information across the Cluster
  • Multiple Indexes are possible
  • Giabyte to Bentabyte scaleability
  • highly scaleable with adding new nodes (linear performance increase)
  • no single point of failure
  • for read or write intense Data
  • Replicate on different racks / different data centers
  • not Operations intense
  • Data Compression (Google's Snappy Algorithm)
  • no need for special caching software
  • configurable data consistency and How to get strong consistency
  • CQL Language (very similar to SQL: DDL, DML, DQL)
  • Apache 2.0 License

Use Cases

  • Real Time Data
  • As well as Big Data

What is Cassandra used for and by whom? Cassandra Use Cases

System Requirements

Cassandra can run on commodity hardware: Selecting hardware for enterprise implementations

Modelling Data

With Cassandra you should follow a Query Driven Schema Design. In general, Data Modelling with Cassandra is harder than with RDBMS

Cassandra is an index construction kit -- Benjamin Black, 2010

Do the simplest thing that can work, then add just enough complexity to make it perform well

Key concepts and data model

  • Modeled after BigTable
  • RowKey and a lot of columns
  • Column names are sorted (definable)
  • Rows belong to a node and are replicated
  • Row lookups are fast (randomly distributed in cluster)
  • Has the concept of Column Families (no fixed set of columns), a set of key/value pairs, they are called Tables in CQL 3
  • Every Row can have different named columns
  • Wide Rows: The maximum number of cells (rows x columns) in a single partition is (2 billion columns [en] = 2 milliarden [de] ), that stick together (in sorted order on disk)
  • Compound Types (Tuples)
  • Elements: (PartitionKey, ClusteringColumn, DataColumnName, DataColumnValue)
  • Cassandra >= 3.0 supports User Defined Functions (UDFs)
  • indexes How are indexes stored and updated?: Relational Concept -> Sequences (autocreated incrementing unique id) Cassandra Concept -> No Sequences -> Use part of the data to create unique index, or UUID

** Entity Model (with Cassandra)**

don't be afraid of writing into several Tables at once

See: CQL Docs

Examples:

How to implement 1-many relationship

  • Have two tables, where the second has a wide row
  • PIMARY KEY(name, productID) # wide row
  • and insert your records into both Tables, for fast lookups

How to implement many-many relationship

  • have two tables with the following PRIMARY KEYS
  • PIMARY KEY(name, productID) # in first table
  • PIMARY KEY(productID, name) # in second table
  • and insert your records into both Tables, to view from either side

With Cassandra, you have to define the Schema, but you can ALTER the schema on the fly, without downtime.

Table definition examples (using cqlsh):

  • CREATE TABLE mytable (name varchar, PIMARY KEY (name)) # row key
  • ALTER TABLE mytable ADD date timestamp # without downtime
  • CREATE TYPE point (x double, y double)
  • CREATE TABLE locations (location frozen PRIMARY KEY)
  • CREATE TYPE bounding_box (lower_left frozen, top_right frozen)
  • CREATE TABLE spatial_index (region frozen<bounding_box>, location frozen , data blob, PRIMARY KEY (region, location, data)) dedormalize your TABLEs

Select example:

  • SELECT location, data FROM spatial_index WHERE region IN (regions)

Supported

  • combined Primary Key
  • Secondary Indexes (not combined) -> should not be the main criteria, -> wide rows are more efficient
  • different Data Types, including Set, Map, List and basic data types, blob etc.
  • Partition Key
  • The Clustering Column defines the Order of the Rows
  • Optional Time to Live can be set on the Column per INSERT

Limitations

  • How you setup Tables limits the query possibilities
  • Rows have a single order
  • no Integrity constraints (e.g. unique, or not null, or foreign key)
  • There is some sort of Transaction, but not like in RDBMS.
  • CQL is much more limited compared to SQL. e.g. no LIMIT or OFFSET
  • Where Clause limitations

CAP Theorem well explained: http://blog.cloudera.com/blog/2010/04/cap-confusion-problems-with-partition-tolerance/

Strong Consistency -> Relationale Datenbanken -> HBase -> Hazelcast? -> memsql -> ...

memsql

durability: http://docs.memsql.com/latest/admin/durability_recovery/#id1

running with docker / aws: http://docs.memsql.com/latest/setup/quick_start/ http://docs.memsql.com/latest/setup/setup_cloud/

distributed sql: http://docs.memsql.com/latest/concepts/distributed_sql/

columnstore (disk) vs rowstore (inMemory) http://docs.memsql.com/4.0/concepts/columnstore/

The selection of store is done through defining indexes on a table, some indexes leverage the row store and some leverage the columnstore technology. The only reason to choose either store is to improve performance.


BigTable Databases like HBase only support lexographical indexes, no monotonically increasing id’s. -> monotonically increasing values are bad.


hazelcast

editions / versions

https://hazelcast.com/products/

partitions

The partition count is 271 by default. This count is configurable and can be changed using the system property hazelcast.partition.count.

Map Backups

Hazelcast distributes map entries onto multiple JVMs (cluster members). Each JVM holds some portion of the data.

Distributed maps have 1 backup by default. If a member goes down, you do not lose data. Backup operations are synchronous, so when a map.put(key, value) returns, it is guaranteed that the entry is replicated to one other node. For the reads, it is also guaranteed that map.get(key) returns the latest value of the entry. Consistency is strictly enforced.

Read Backup Data

By default, Hazelcast has one sync backup copy. If backup-count is set to more than 1, then each member will carry both owned entries and backup copies of other members. So for the map.get(key) call, it is possible that the calling member has a backup copy of that key. By default, map.get(key) will always read the value from the actual owner of the key for consistency. It is possible to enable backup reads (read local backup entries) by setting the value of the read-backup-data property to true. Its default value is false for strong consistency. Enabling backup reads can improve performance.

Eviction

Unless you delete the map entries manually or use an eviction policy, they will remain in the map. Hazelcast supports policy based eviction for distributed maps. Currently supported policies are LRU (Least Recently Used) and LFU (Least Frequently Used). There are also other properties as shown in the below sample declarative configuration.

<hazelcast>
  <map name="default">
    ...
    <time-to-live-seconds>0</time-to-live-seconds>
    <max-idle-seconds>0</max-idle-seconds>
    <eviction-policy>LRU</eviction-policy>
    <max-size policy="PER_NODE">5000</max-size>
    <eviction-percentage>25</eviction-percentage>
    ...
  </map>
</hazelcast>

** evicting a single entry ** But, you may particularly want to evict some specific map entries. In this case, you can use the ttl and timeunit parameters of the method map.put(). A sample code line is given below.

myMap.put( "1", "John", 50, TimeUnit.SECONDS )

So, the map entry with the key "1" will be evicted in 50 seconds after it is put into myMap.

Map Persistence

Hazelcast allows you to load and store the distributed map entries from/to a persistent data store such as a relational database. For these, you can use Hazelcast's MapStore and MapLoader interfaces.

When you provide a MapLoader implementation and request an entry (IMap.get()) that does not exist in the memory, MapLoader's load or loadAll methods will load that entry from the data store. This loaded entry is placed into the map and will stay there until it is removed or evicted.

When a MapStore implementation is provided, an entry is put also into a user defined data store.

image NOTE: Data store needs to be a centralized system that is accessible from all Hazelcast Nodes. Persisting to local file system is not supported.

Hazelcast supports read-through, write-through and write-behind persistence modes which are explained in below subsections.

Read-Through

If an entry does not exist in the memory when an application asks, Hazelcast asks your loader implementation to load that entry from the data store. If the entry exists there, the loader implementation gets it, hands it to Hazelcast, and Hazelcast puts it into the memory. This is read-through persistence mode.

Write-Through

MapStore can be configured as write-through by setting the write-delay-seconds property to 0. This means the entries will be put to the data store synchronously.

In this mode, when the map.put(key,value) call returns, you can be sure that

MapStore.store(key,value) is successfully called so the entry is persisted.

In-Memory entry is updated

In-Memory backup copies are successfully created on other JVMs (if backup-count is greater than 0)

Same behavior goes for the map.remove(key), only difference is that MapStore.delete(key) is called when it will be deleted.

If MapStore throws an exception, then the exception will be propagated back to the original put or remove call in the form of RuntimeException.

Write-Behind

MapStore can be configured as write-behind by setting the write-delay-seconds property to a value bigger than 0. This means the modified entries will be put to the data store asynchronously after a configured delay.

image NOTE: In write-behind mode, by default Hazelcast coalesces updates on a specific key, i.e. applies only the last update on it. But, you can set MapStoreConfig#setWriteCoalescing to FALSE and you can store all updates performed on a key to the data store.

Distributed Query

Distributed queries access data from multiple data sources stored on either the same or different computers.

Query Overview

Hazelcast partitions your data and spreads it across cluster of servers. You can iterate over the map entries and look for certain entries (specified by predicates) you are interested in. However, this is not very efficient because you will have to bring the entire entry set and iterate locally. Instead, Hazelcast allows you to run distributed queries on your distributed map.

How It Works

The requested predicate is sent to each member in the cluster. Each member looks at its own local entries and filters them according to the predicate. At this stage, key/value pairs of the entries are deserialized and then passed to the predicate. The predicate requester merges all the results coming from each member into a single set. If you add new members to the cluster, the partition count for each member is reduced and hence the time spent by each member on iterating its entries is reduced. Therefore, the above querying approach is highly scalable. Another reason it is highly scalable is the pool of partition threads that evaluates the entries concurrently in each member. The network traffic is also reduced since only filtered data is sent to the requester.

Hazelcast offers the following APIs for distributed query purposes:

  • Criteria API
  • Distributed SQL Query

Indexing

Hazelcast distributed queries will run on each member in parallel and only results will return the conn. When a query runs on a member, Hazelcast will iterate through the entire owned entries and find the matching ones. This can be made faster by indexing the mostly queried fields, just like you would do for your database. Indexing will add overhead for each write operation but queries will be a lot faster. If you query your map a lot, make sure to add indexes for the most frequently queried fields. For example, if your active and age < 30 query, make sure you add index for active and age fields.

Replicated Map

A replicated map is a weakly consistent, distributed key-value data structure provided by Hazelcast.

All other data structures are partitioned in design. A replicated map does not partition data (it does not spread data to different cluster members); instead, it replicates the data to all nodes.

This leads to higher memory consumption. However, a replicated map has faster read and write access since the data are available on all nodes and writes take place on local nodes, eventually being replicated to all other nodes.

Weak consistency compared to eventually consistency means that replication is done on a best efforts basis. Lost or missing updates are neither tracked nor resent. This kind of data structure is suitable for immutable objects, catalogue data, or idempotent calculable data (like HTML pages).

Replicated map nearly fully implements the java.util.Map interface, but it lacks the methods from java.util.concurrent.ConcurrentMap since there are no atomic guarantees to writes or reads.

Kafka:

  • How are the Partitions of a Topic divided between the Consumers in the ConsumerGroup?

    every partition is delivered to exactly one Consumer in every ConsumerGroup

  • How are the Partitions “newly divided” between the Consumers when the number of Consumer changes?

    Consumer rebalancing is triggered on each addition or removal of both broker nodes and other consumers within the same group. For a given topic and a given consumer group, broker partitions are divided evenly among consumers within the group. A partition is always consumed by a single consumer. Read: Consumer rebalancing algorithm -How Can a consumer tell Kafka how far “forward” it want’s to read the data from the Queue beginning with the cursor (offset)? the Consumer always get all available messages after its current pisition in the log (up to configurable max size) when pulling -How to store the offsets per customer inside Kafka?

  • Does the Broker push to the Consumer, or does the Consumer pull from the Broker?

    the Consumer pulls from the Broker (Push vs. pull)[http://kafka.apache.org/documentation.html#design_pull]

  • How can we be sure that no data gets lost?

    only store the offsets when it is guaranteed that the message is persisted successfully (at least once delivery) log gets flushed to disk after configurable Time or Number of Messages. See: Writes

Notes:

  • Kafka does support message compression (GZIP) or message compaction (only keep the latest keys, older values of the same key are removed from the message). In the current version, Kafka does not support both (compression and message compaction at the same time)
  • kafka >= 0.9 supports quotas on produce and fetch requests (byte-rate thresholds per client-id).
  • Producer default partitioning strategy is hash(key)%numPartitions
  • with the Simple Client, one needs to handle offsets manually.
  • when a new Consumer registres itself, it listens on changes of the number of consumers as well as brokers. to rebalance itself inside the consumer group
  • a partition is always consumed by a single Consumer (inside the ConsumerGroup)
  • Topics can be created manually or automatically when data is first published to a non-existent topic. Configuration for auto-created topics is possible (number of partitions, replications).
  • It is possible to add partitions on the fly. If data is partitioned by hash(key) % number_of_partitions then this partitioning will potentially be shuffled by adding partitions but Kafka will not attempt to automatically redistribute data in any way. See : Modifying topics
  • linearizing id's with kafka: Kafka only provides a total order over messages within a partition, not between different partitions in a topic. Per-partition ordering combined with the ability to partition data by key is sufficient for most applications. However, if you require a total order over messages this can be achieved with a topic that has only one partition, though this will mean only one consumer process per consumer group.
Apache Thrift
Apache Avro
MessagePack
ProtocolBuffers
http://www.zerorpc.io/ for python and node.js
storm switching from zeromq to netty
http://storm.apache.org/2013/12/08/storm090-released.html
http://yahooeng.tumblr.com/post/64758709722/making-storm-fly-with-netty
storm.yaml
storm.messaging.transport: "backtype.storm.messaging.netty.Context"
storm.messaging.netty.server_worker_threads: 1
storm.messaging.netty.client_worker_threads: 1
storm.messaging.netty.buffer_size: 5242880
storm.messaging.netty.max_retries: 100
storm.messaging.netty.max_wait_ms: 1000
storm.messaging.netty.min_wait_ms: 100
http://netty.io/index.html
rpc with netty and messagepack:
https://msgpack.wordpress.com/2010/05/04/introduction-to-messagepack-rpc/

spark stream

programming model

  • Stream Primitive: DStream (vs Tuple in Storm)
  • Stream Source: HDFS, Network (vs Spouts in Storm)
  • Computation / Transformation: Transformation, Window Operations
  • Stateful Operations: yes
  • Output / Persistence: foreachRDD

Reliability

  • exactly once, in some node failure scenario falling back to at least once or data loss.
  • If a worker node fails, the system can recompute from input data.
  • Only HDFS-backed data soruces are fully fault tolerant
  • Or a source that can replay data e.g. Kafka in conjunction with Write Ahead Logs -> additional performance penalty.
  • Moving data to HDFS prior to stream processing introduces additional latency
  • If the network receiver node fails, the data received, but not yet replicated to other nodes may be lost

Performance

  • can be better than Storm e.g. when aggregating small records (Micro-Batching)

Latency

  • is worse than Storm's

zookeeper

Zookeeper is a distributed coordination service for distributed applications.

Use Cases: synchronization, serialization (sequential consistency), coordination ...

Zookeeper can store a rather small amount of data distributed across many servers. The best performance is reached, when the read/write ratio is > 10.

znodes can't be bigger than 1MB.

znodes are like directories/files that can contain data and can have a parent and children ephermeral nodes are session nodes that only live as long as the application (zookeeper session) is running. znodes and ephermeral nodes can be sequential nodes. They just add a number in sequence at the end of the node.

Any client must first establish a session with zookeeper in order to run any operation. Operations within the same session (must also be on the same thread) are guaratneed to be executed in the submitted order.

Clients connect to a server in random order. The client can not prefer a specific server to connect to, to enable load balancing.

Zookeeper supports different kinds of locks (read/write, global).

It is possible to watch for changes of znodes.

The quorum in zookeeper is the minimum number of Server that have to be running and available for zookeeper to work.

Guarantees

  • Sequential Consistency - Updates from a client will be applied in the order that they were sent.
  • Atomicity - Updates either succeed or fail. No partial results.
  • Single System Image - A client will see the same view of the service regardless of the server that it connects to.
  • Reliability - Once an update has been applied, it will persist from that time forward until a client overwrites the update.
  • Timeliness - The clients view of the system is guaranteed to be up-to-date within a certain time bound.

NOT guaranteed is:

  • strong consistency across different clients
  • sync method can help to ensure catchup with leader

A zookeeper cluster is called an ensemble. It consists of:

  • leader: al writes go through the leader and it commits them to all followers
  • follower: reads/writes
  • observer: no writes

the number of leaders and followers together should be odd, to ensure majority in the voting process.

@andineck

This comment has been minimized.

Copy link
Owner Author

@andineck andineck commented Jan 27, 2016

@andineck

This comment has been minimized.

Copy link
Owner Author

@andineck andineck commented Feb 8, 2016

@andineck

This comment has been minimized.

Copy link
Owner Author

@andineck andineck commented Feb 8, 2016

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment