Skip to content

Instantly share code, notes, and snippets.

@Aerlinger
Created September 25, 2015 19:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Aerlinger/9580876111331060de6d to your computer and use it in GitHub Desktop.
Save Aerlinger/9580876111331060de6d to your computer and use it in GitHub Desktop.
Technical Specification
Design Architecture of Datamill
=====================================
## Core Requirements
#####**High Level (user-facing) Design Requirements**
- **Multi-Tenancy:** Ability to service multiple users simultaneously
- **Low-Latency:** Where possible, data should always be current with less than a 15-20 minute delay.
- **Reliability:** Services should run with *> 99.99%* uptime. Data recovery should be possible in the event of service or hardware failure.
- **Ability to join queries across data sources:** Possible join keys include shared columns between data such as date, time, session_id, geo/location, and so on.
- **Agnostic to input source:**
- **Security:** Ability to delegate isolated permissions to individual groups of users within a shared resource. (e.g. a multi-tenant database with a different schema for each user)
##### **Technical Design Requirements**
- **Horizontal Scalability:** The ability to increase computational capacity by launching additional servers is a critical aspect of hosting services which need to accommodate changes in data volume. The load should ideally be shared reasonably equally amongst nodes in the cluster.
- **Schema Awareness:** Infer structure within existing data where applicable.
- **Multiplexing:** The ability to process multiple inputs and outputs simultaneously
- **Multiprocessing:** Ability to run one or more tasks or event more than one pipeline concurrently.
- **Decoupling** Coupling must be kept to a minimum both between stages of the data pipeline and
- **Idempotency:** A task that is run twice should not result in a different state than if it is run once
- **Isolation:** Applications should be able to run within their own sandbox without relying on outside processes (Docker)
- **Automation:** High level configuration with minimal necessary input. Low level details should be abstracted where possible.
- **Statelessness:** Runtime code should be stateless wherever possible. Only the state of the data being processed should be modified.
- **Testing:** *All code and features should be covered by automated unit tests and integration tests.* Service and API calls should be mocked and stubbed where necessary.
- **Instrumentation/Monitoring:** Processes and services should report Memory/CPU usage and should also provide stacktraces or detailed error message if they occur.
##### **Best Practices**
- **Logging:** Logs should separated by level of notification (*Fatal*, *Error*, *Warn*, *Info*, *Debug*). As noted above, *Fatal* and *Error* level exceptions should issue a notification either to email or Slack. Logs should be consolidated and accessible without needing to rely on SSH.
- **Documentation:** Each code repository should have a detailed Readme with detailed instructions on how to build, run, and test the code.
## Application Architecture
### The Data Pipeline
At the heart and soul of Datamill is the data pipeline, responsible for concurrently processing data from many input sources. A single packet of data within the pipeline passes sequentially from one stage to the next, with each stage performing a single, specific transformation or operation on the data it receives (i.e. a *"Chain of Responsibility"*). Data passing between layers in the pipeline can be buffered in a queue, thus allowing a loose coupling between each respective layer.
discrete tasks with each task acting independently on each group data in a "first-in first-out" manner.
### Components of the pipeline
![enter image description here](https://lh3.googleusercontent.com/UZQKt_eluIf4c7u2OGav4qoeoJ7th98NQAZa8e2zCQ=s0 "DatamillPipeline.png")
- **Source Data (left):** Original source of the data. Can be in the form of an API, a database, a raw text file, a CSV file, etc.
- **Chronos Scheduler (magenta):** The Chronos scheduler runs jobs or tasks that must be run at specific intervals or times of day.
- **Input Cache:** Stores data in a buffer until it is ready for subsequent processing by the Input Adapter.
- **Input Reader/Importer task:** Connects, authenticates, and pulls data from the input source into a buffer or cache.
- **Input Adapter:** Performs initial mapping of the input data values into a common format for further processing.
- **Transform/Reshape:** More advanced processing on the data including reshaping, extracting custom fields, and performing any necessary data enrichment.
- **Output Cache:** Serves similar purpose to input cache. This is just a persistent data store to cache or buffer intermediate results between stages in the pipeline
- **Output Writer:** Connects, authenticates, and pushes data into the output destination.
- **Output Destination:** Repository to store final processed form of data. In most cases this is a database or store which provides a simple and unified query interface to the data. Examples are Redshift, ElasticSearch, and Hadoop.
- **Configuration Provider:** Service that provides protected Kernel information to each service in the cluster
### Layers of the pipeline
Typically tasks fall into one of three layers: **Extract** *(A)*, **Transform** *(B)*, and **Load** *(C)*.
#### A. Extract
1. **Connect** and authenticate with source data (e.g. API/DB/Filesystem)
2. **Pull** data from source into a temporary buffer such as `/tmp` or an s3 bucket.
3. **Repeat:** New data is extracted in two circumstances
- *Streaming:* As new data arrives.
- *Batch:* If notified by external service (e.g. Chronos).
#### B. Transform
4. **Input Adapter Reshape:** Preliminary transformation to map data to a common format.
5. **Transform:** Apply one or many transformations to the data:
*Basic Transformations*
- **Impute:** Fill NA's/missing values
- **Enrich:** Augment data by creating calculated columns from existing data (e.g. encode location from IP address, parse user-agent)
- **Select/Project:** Drop keys/columns from data.
- **Filter:** Drop rows/items based on condition
- **Flatten:** Extract nested parameters to parent.
- **Rename:** Rename a key
- **Group:** Group by a key
- **Parse:** Convert string to Date, State/City, Zipcode, etc.
*Advanced Transformations:*
- **Standard scale**
- **Normalize**
- **TfIdf**
- **PCA**
- **Binarize**
- **Re-encode**
3. Save the transformed result in a buffer/cache
#### C. Load
1. Connect and authenticate with output data store
2. Map or save transformed data into consistent format with output datastore (CSV in the case of Redshift, JSON in the case of ElasticSearch)
3. Copy/load/insert data into destination data store.
### 2.3 Importing a Database to Redshift
#### 2.3.1 Postgres -> Redshift
1. `pg_dump` to generate raw CSV files of the database
2. Load the CSV tables into a Spark DataFrame
3. Use `spark-redshift` to import the Dataframe into Redshift
![enter image description here](https://lh3.googleusercontent.com/-tdu4J6Z0wwY/VbepmOeTqaI/AAAAAAAAADQ/djO_N3CJk2Y/s0/PgToRedshift.png "PgToRedshift.png")
Operations Architecture
=====================================
#####*Architecture of components that run Datamill*
At the hardware layer, Datamill consists of a cluster of several m3.xlarge virtual machines each running Mesos, a distributed OS responsible for delegating shared resources on the cluster (a pool of RAM and CPU resources across all slaves). A single machine in the cluster is designated as the master machine. At an abstraction layer above the Mesos OS, is the notion of a service, which is a running process responsible for performing a certain task (fetching input, making API calls, transforming data in S3).
![enter image description here](https://gigaom.com/wp-content/uploads/sites/1/2014/02/stackmesos.png?w=708)
#### Hardware Layer:
Cluster of `m3.xlarge` EC2 instances, within the same VPN which can be scaled horizontally on demand.
#### Kernel/Distributed OS Layer:
Apache Mesos provides the "backbone" of the software stack and acts as a black box responsible for allocating and provisioning shared resources to each of the respective applications running across the cluster. Using a shared pool of resources allows multi-tenant applications to run concurrently while efficiently sharing memory and CPU.
#### Framework/API Layer:
The Framework layer consists of higher level frameworks and languages such as Spark, Chronos, Ruby or Python which provide an API for application code or other specialized services to run. Applications on the framework layer are typically registered with Mesos and may run on one or more instance in the cluster.
#### Application Layer:
The application layer runs all of the proprietary code within Datamill, and is specifically responsible input/output, data processing, and any related logic. Applications usually run within the context a framework and are deployed within a self-contained Docker application.
![enter image description here](http://spark.apache.org/docs/latest/img/cluster-overview.png)
The application layer is how Datamill interacts with the outside world, all data that goes into or out of Datamill passes directly through the application layer.
## On the backburner
Think of this as "food for thought" -- stuff we don't have time for right now but may want to revisit in the future.
- **Aggregating S3 files:** Dealing with Hadoop's small file problem
Development and Workflow Guidelines
=====================================
Just as we established a core tenet for product and business decisions we likewise establish a core tenet for development:
> Above all else, Simplicity.
There are three core domains involving development:
## Teamwork Principles
*(These points are pretty trite, I know, but at least wanted to put this stuff on paper)*
- **Humility** We're all idiots on some things.
- **Ask questions, always:**
- **Flag issues; sound the alarm if necessary:** This was originally in the previous point until I realized this needed to be a point in its own.
## Workflow Principles
- Task Management in JIRA
- Slack communication should be in public channels whenever
- **Don't spin your wheels:** If you're stuck on a problem, move on to something else. *You shouldn't code for more than 30 minutes without committing something.* Unless you're using CVS, in which case you're probably not doing much of anything other than managing CVS.
## Development Principles
- Write your code as though it will be read by others.
- Scan diff of changes before committing
- Use semantic versioning http://semver.org/
Please read the following https://pragprog.com/the-pragmatic-programmer/extracts/tips
- **Spaces (2) instead of tabs** Developers who use typewriters, printing presses, and/or Windows ME are exempt from this rule.
- Try to keep code below 120 character limit wherever possible
## Testing Principles
- All key functionality should be tested
- Test locally using sample data before testing on a cluster
## Source Control Principles
Some more required reading: https://sethrobertson.github.io/GitBestPractices/
- **Master should always be in a deployable state** (i.e. tested and qualified as production code)
- **Commit early and commit often:** This is really important in not only establishing a productive workflow but it also makes debugging with `git bisect` much, much simpler. Generally, you should commit and push several times a day and push to the remote just as often unless your code isn't ready to be seen by others. http://blog.codinghorror.com/check-in-early-check-in-often/. I have a status message at my command prompt which shows time since last commit.
- **All commits on master must past tests** This keeps other team members
- **branch names should be prefixed with initials of the creator of that branch followed by a `/`.** For instance, `git checkout -b ae/my-new-branch`.
- **experimental branches should be prefixed with a `test`.** As Anthony Erlinger, an example would be `ae/test-mixpanel-api`
- **branch names should use `spine-casing`** A little easer to read and type than underscores.
- **Keep git commit messages under 80 lines.** Longer descriptions for commits can be specified if necessary on lines after the first.
- **Altering history:** Don't alter history of any code that's been pushed. It's OK to alter history locally as long as it hasn't been pushed.
- **Always scan and review git diff of changes to be committed before committing** Personally, I always use `git commit -v` which shows all changes inline in your default editor. I recommend creating an alias for this to make getting in this habit easier (I use `alias gc='git commit -v'` in my `.profile` which gets sourced by shell).
- **Use Github pull requests to facilitate collaboration and discussion about proposed changes, rather than merging from the command line** It's also much easier to visually track proposed changes in code within the GH pull request.
- Your .gitignore should contain `target`
## Dev Ops Principles
- You should never need to to SSH into an instance to do anything in order to deploy something, you're doing it wrong.
- All EC2 instances should be considered **ephemeral**. In other words, instances can be stopped and destroyed and a new one built and put in place with an absolute minimum of set-up and configuration.
- Don't create ad-hoc configuration on a per-machine basis.
## AWS Principles
First, read the full article here: https://docs.google.com/document/d/1-WqIXwjr4my21lIlRPR5-bZtaEMqxQG6DTtvy8jKFYw/edit
Just to reiterate some important points:
- When creating any resource in AWS, post a message in the #datafreedom channel on Slack. This is to make sure we don't end up with duplication.
- Sensitive data should never be readable as plaintext. The only exception are environment variables
- On the same note, think twice before creating any resource, including basic items like permissions, keys, buckets, etc. These things accumulate quickly and like anything else, it's much harder to remove stale objects than to create them.
- Use MFA for login (this will be enforced soon)
- The AWS CLI and S3Cmd are great tools for interacting with AWS, please familarize yourself with them.
## Security Principles
- Don't send passwords over Slack. Use Meldium for this.
# Appendix
## A.1. Example Code (WIP)
### Example Deployment Procedure for a Spark Standalone Process
> See the Readme on main Datamill repository: https://github.com/methodmill/OperationDataFreedom
### Deploying a Spark process under Mesos
```
docker run smartislav/mesos-spark:1.4.0-2 \
--deploy-mode cluster \
--master mesos://<ip.of.mesos.cluster>/service/sparkcli \
--class com.methodmill.Datamill.transformation.Transformer \
--driver-cores 1 \
--driver-memory 2G \
--executor-memory 4G \
--conf="spark.mesos.executor.docker.image=mesosphere/spark:1.4.0-rc4-hdfs" \
https://s3.amazonaws.com/Datamill-spark-jars/Datamill-redshifter.jar \
--adapter mParticle \
--src "s3n://destination_bucket/prefix_*.txt" \
--dst "s3n://destination-bucket" \
--db dbname \
--dbPw "dbpassword" \
--partitions 16
```
### Deploying a new single-node application
#### 1. Create a new application
*A very basic application:*
```
mkdir sample_application && cd sample_application
# Just echo some ENV variables
echo "ruby -e 'puts ENV.keys'" > application.sh
```
#### 2. Commit and push the application to Github
#### 3. Pull a base docker container in which you want your code to run
```
docker pull ruby
```
#### 4. Run the docker image and modify it as necessary
```
docker run -it ruby /bin/sh
$ git clone <path_to_repo>
```
#### 5. Exit the docker instance, and get the newly modified Docker images from the list of installed images:
```
$ docker images
```
#### 5. Commit changes to docker with `docker commit` and push the changes to Dockerhub
```
$ docker commit -m "cloned repo" -a "Anthony Erlinger" e2b9f7436630 aerlinger/ruby_example:v1
$ docker push aerlinger/ruby_example
```
#### 6. Run the image from the cluster
The task can now be run anywhere by using the following URL:
`docker run aerlinger/ruby_example:v1 /bin/sh -c 'cd /root/ruby_example && ./path_to_repo'`
The above command can be run in as-is within Chronos.
###A.2 Additional Resources
#### Mesos
[SparkSummit presentation on Mesos with Spark](https://www.youtube.com/watch?v=WHVbpx6b1CM)
#### Guidelines
[12 Factor App development](http://12factor.net/)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment