Skip to content

Instantly share code, notes, and snippets.

@allquantor
Created December 10, 2023 14:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save allquantor/ae391de2131be7a062d542589f6b34fa to your computer and use it in GitHub Desktop.
Save allquantor/ae391de2131be7a062d542589f6b34fa to your computer and use it in GitHub Desktop.

Integration Engineering Exercise: Vela.Exchange Data Pipeline

Objective

The goal of this exercise is to demonstrate your ability to integrate a decentralized platform, specifically vela.exchange, into our existing data pipeline. Your task is to ensure that every trade and other relevant information from vela.exchange is captured and stored in our database.

Understanding Our Architecture

Before you begin, familiarize yourself with our current infrastructure, which is outlined below. Understanding the flow of data and the structure of our database tables is crucial for the successful completion of this exercise.

Infrastructure Overview

+----------------------------------+          +-------------------+          +----------------------+          +-----------------------------+          +-------------------------+
| Infrastructure Crypto Nodes      |          | The Fetcher       |          | Apache Kafka Queue  |          | Spark Streaming             |          | Superset PostgresDB     |
| (e.g., ARB, Opt, ...)            |   Blocks | (Custom Component)|   Blocks | (Consumes blocks)    |   Parsed | (Data Parsing Implementation)|   Parsed | (Data available for     |
|                                  | -------->|                   | -------->|                      | -------->|                             | -------->| frontend visualization) |
+----------------------------------+          +-------------------+          +----------------------+          +-----------------------------+          +-------------------------+

This flowchart illustrates how data moves through our system. Your task will focus on integrating vela.exchange data into this pipeline.

Development Environment Setup

To begin development, you'll need to run the provided Docker container which simulates our infrastructure. This container includes access to a PostgresDB instance reflecting the expected table format for validation purposes.

Dummy Docker Fetch Instruction

docker pull [your-docker-image]

Acquiring TestData

Connect to the following node to obtain test data. Ensure you monitor the list of relevant smart contracts provided.

Dummy Node Connection Instruction

ssh user@xxx.xxx.xxx.xxx

Deliverable Details

Your deliverable for this exercise is a Python file that utilizes Apache Spark's streaming capabilities to parse and persist data from vela.exchange into our system's database. Below are the detailed requirements and structure for your Python file.

Python File Requirements

  1. Spark Session Initialization: Initialize a Spark session to enable the application to connect to the Spark cluster.
  2. Data Source Connection: Establish a connection to the Apache Kafka queue that receives the blocks of data.
  3. Streaming Data Processing: Utilize Spark Streaming to process the data in real-time as it arrives from Kafka.
  4. Data Parsing: Write functions to parse the blocks received from vela.exchange, extracting the necessary trade information.
  5. Database Persistence: Ensure that the parsed data is correctly formatted and inserted into the Superset PostgresDB.
  6. Error Handling: Implement robust error handling to manage any interruptions in the data stream or parsing errors.
  7. Logging: Include logging throughout your application to capture the flow of data and any issues that arise.
  8. Scalability Considerations: Write the code with scalability in mind, allowing for processing of increasing volumes of data.
  9. Comments and Documentation: Provide detailed comments within the code to explain the logic and any complex sections.

Example Python File Structure for PySpark

from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
# Other necessary imports

def initialize_spark_session():
    # Code to initialize Spark session
    pass

def connect_to_kafka():
    # Code to connect to Kafka queue and read blocks
    pass

def parse_block(block):
    # Code to parse block and extract trade information
    pass

def insert_to_db(parsed_data):
    # Code to insert parsed data into PostgresDB
    pass

def main():
    spark = initialize_spark_session()
    kafka_stream = connect_to_kafka(spark)
    
    # Define the processing of each RDD from the stream
    def process_rdd(rdd):
        # Parse each block in the RDD
        parsed_data = rdd.map(parse_block)
        
        # Insert the parsed data into the database
        parsed_data.foreach(insert_to_db)
    
    # Create a StreamingContext from the Spark session
    ssc = StreamingContext(spark.sparkContext, batchDuration=2)  # Batch duration in seconds
    
    # Process each batch of data
    kafka_stream.foreachRDD(process_rdd)
    
    # Start the streaming computation
    ssc.start()
    
    # Wait for the streaming to finish
    ssc.awaitTermination()

if __name__ == "__main__":
    main()



## Evaluation Criteria
Integration: How well does the solution integrate with our existing pipeline?
Complexity Handling: Identification and management of any complexities or issues encountered.
Scalability: Can the solution handle scaling up in terms of data volume and transaction speed?
Rationale: Clear explanation of any compromises made and their reasoning.
Good luck with the exercise!
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment