Skip to content

Instantly share code, notes, and snippets.

class Task:
def __init__(self, spark: Optional[SparkSession] = None, config: JobConfig = None):
self.config = config
self.spark = spark
def main(self):
extracted = self.extract()
transformed = self.transform(**extracted)
return self.load(transformed)
@PipelineDecorator.component(return_values=['result_dataset_id'], cache=True, task_type=TaskTypes.data_processing)
def split_videos(dataset_id):
from tracking_factory import TrackingFactory
tracker = TrackingFactory.get_tracker()
cached_results, dataset_folder = tracker.init_task(dataset_id)
if cached_result:
return cached result
@assapin
assapin / pipeline.py
Last active October 9, 2022 17:03
Clearml Pipeline from Decorator
from clearml.automation.controller import PipelineDecorator
from clearml import TaskTypes
# The actual pipeline execution context
# notice that all pipeline component function calls are actually executed remotely
# Only when a return value is used, the pipeline logic will wait for the component execution to complete
@PipelineDecorator.pipeline(name='custom pipeline logic', project='examples', version='0.0.5')
def media_pipeline(video_file_uri: str):
print('pipeline args:', video_file_uri)
orch.org/v1alpha1
kind: ElasticJob
metadata:
name: imagenet
namespace: elastic-job
spec:
rdzvEndpoint: "etcd-service:2379"
minReplicas: 1
maxReplicas: 3
replicaSpecs:
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
 name: efs-claim
 namespace: elastic-job
spec:
 accessModes:
 - ReadWriteMany
 storageClassName: efs-sc
 resources:
apiVersion: eksctl.io/v1alpha5
kind: ClusterConfig
metadata:
name: torchelastic
region: REGION
iam:
withOIDC: true
vpc:
id: VPC_ID
FROM pytorch/pytorch:1.8.1-cuda11.1-cudnn8-runtime
RUN apt-get -q update && apt-get -q install -y wget unzip
RUN pip install torchelastic==0.2.2
### etcd installation - only relevant for testing locally
RUN mkdir ./bin 
COPY elastic/examples/bin/install_etcd ./bin
RUN chmod -R u+x ./bin
RUN ./bin/install_etcd -d ./bin
ENV PATH=/workspace/bin:${PATH}
apiVersion: elastic.pytorch.org/v1alpha1
kind: ElasticJob
metadata:
name: imagenet
namespace: elastic-job
spec:
rdzvEndpoint: "etcd-service:2379"
minReplicas: 1
maxReplicas: 2
replicaSpecs: