The goal of this exercise is to demonstrate your ability to integrate a decentralized platform, specifically vela.exchange, into our existing data pipeline. Your task is to ensure that every trade and other relevant information from vela.exchange is captured and stored in our database.
Before you begin, familiarize yourself with our current infrastructure, which is outlined below. Understanding the flow of data and the structure of our database tables is crucial for the successful completion of this exercise.
+----------------------------------+ +-------------------+ +----------------------+ +-----------------------------+ +-------------------------+
| Infrastructure Crypto Nodes | | The Fetcher | | Apache Kafka Queue | | Spark Streaming | | Superset PostgresDB |
| (e.g., ARB, Opt, ...) | Blocks | (Custom Component)| Blocks | (Consumes blocks) | Parsed | (Data Parsing Implementation)| Parsed | (Data available for |
| | -------->| | -------->| | -------->| | -------->| frontend visualization) |
+----------------------------------+ +-------------------+ +----------------------+ +-----------------------------+ +-------------------------+
This flowchart illustrates how data moves through our system. Your task will focus on integrating vela.exchange data into this pipeline.
To begin development, you'll need to run the provided Docker container which simulates our infrastructure. This container includes access to a PostgresDB instance reflecting the expected table format for validation purposes.
docker pull [your-docker-image]
Connect to the following node to obtain test data. Ensure you monitor the list of relevant smart contracts provided.
Your deliverable for this exercise is a Python file that utilizes Apache Spark's streaming capabilities to parse and persist data from vela.exchange into our system's database. Below are the detailed requirements and structure for your Python file.
- Spark Session Initialization: Initialize a Spark session to enable the application to connect to the Spark cluster.
- Data Source Connection: Establish a connection to the Apache Kafka queue that receives the blocks of data.
- Streaming Data Processing: Utilize Spark Streaming to process the data in real-time as it arrives from Kafka.
- Data Parsing: Write functions to parse the blocks received from vela.exchange, extracting the necessary trade information.
- Database Persistence: Ensure that the parsed data is correctly formatted and inserted into the Superset PostgresDB.
- Error Handling: Implement robust error handling to manage any interruptions in the data stream or parsing errors.
- Logging: Include logging throughout your application to capture the flow of data and any issues that arise.
- Scalability Considerations: Write the code with scalability in mind, allowing for processing of increasing volumes of data.
- Comments and Documentation: Provide detailed comments within the code to explain the logic and any complex sections.
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
# Other necessary imports
def initialize_spark_session():
# Code to initialize Spark session
pass
def connect_to_kafka():
# Code to connect to Kafka queue and read blocks
pass
def parse_block(block):
# Code to parse block and extract trade information
pass
def insert_to_db(parsed_data):
# Code to insert parsed data into PostgresDB
pass
def main():
spark = initialize_spark_session()
kafka_stream = connect_to_kafka(spark)
# Define the processing of each RDD from the stream
def process_rdd(rdd):
# Parse each block in the RDD
parsed_data = rdd.map(parse_block)
# Insert the parsed data into the database
parsed_data.foreach(insert_to_db)
# Create a StreamingContext from the Spark session
ssc = StreamingContext(spark.sparkContext, batchDuration=2) # Batch duration in seconds
# Process each batch of data
kafka_stream.foreachRDD(process_rdd)
# Start the streaming computation
ssc.start()
# Wait for the streaming to finish
ssc.awaitTermination()
if __name__ == "__main__":
main()
## Evaluation Criteria
Integration: How well does the solution integrate with our existing pipeline?
Complexity Handling: Identification and management of any complexities or issues encountered.
Scalability: Can the solution handle scaling up in terms of data volume and transaction speed?
Rationale: Clear explanation of any compromises made and their reasoning.
Good luck with the exercise!