Skip to content

Instantly share code, notes, and snippets.

View timothymugayi's full-sized avatar
🎯
Focusing

Timothy Mugayi timothymugayi

🎯
Focusing
View GitHub Profile
import os
from celery import Celery
from celery.schedules import crontab
app = Celery('tasks', broker=os.environ.get('REDIS_URL', 'redis://localhost:6379/0'))
@app.on_after_configure.connect
from celery_tasks.tasks import latest_bitcoin_price
import time
if __name__ == '__main__':
result = latest_bitcoin_price.delay('usd')
print('Task finished? {}'.format(result.ready()))
print('Task result: {}'.format(result.result))
import os
import json
import requests
from celery import Celery
app = Celery('tasks', broker=os.environ.get('REDIS_URL', 'redis://localhost:6379/0'), backend='redis')
import os
import pathlib
import time
import urllib.request
import zipfile
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster
from memory_profiler import profile
from tqdm import tqdm
import os
import uuid
import pathlib
import zipfile
import pandas as pd
import urllib.request
from tqdm import tqdm
from memory_profiler import profile
import json
import os
import time
import boto3
# Partition keys are only required when you have more than 1 shard defined in the data stream but always required. Kinesis computes the MD5 hash of a partition key to decide what shard to store the record on (if you describe the stream you'll see the hash range as part of the shard description).
#
# Each shard can only accept 1,000 records and/or 1 MB per second. Hence If you attempt to write to a single shard faster than the default rate limit you'll get a ProvisionedThroughputExceededException Using With multiple shards, you can scale this limit: 8 shards gives you 8,000 records and/or 8 MB per second.
#
import calendar
import json
import os
import random
import time
import boto3
from datetime import datetime
import os
import sys
import magic
import boto3
import threading
from pathlib import Path
from boto3.s3.transfer import TransferConfig
import time
from timeloop import Timeloop
from datetime import timedelta
tl = Timeloop()
@tl.job(interval=timedelta(seconds=300))
def train_model():
print("call Dask cluster 300s job current time : {}".format(time.ctime()))
from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.interval import IntervalTrigger
scheduler = BlockingScheduler()
@scheduler.scheduled_job(IntervalTrigger(hours=3))
def train_model():
print('dask train_model! The time is: %s' % datetime.now())