Skip to content

Instantly share code, notes, and snippets.

@linar-jether
linar-jether / PySpark DataFrame from many small pandas DataFrames.ipynb
Created Jul 8, 2018
Convert a RDD of pandas DataFrames to a single Spark DataFrame using Arrow and without collecting all data in the driver.
View PySpark DataFrame from many small pandas DataFrames.ipynb
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
@linar-jether
linar-jether / dask_celery_scheduler.py
Created Mar 18, 2018
A dask distributed scheduler based on on Celery tasks - Allows reusing an existing celery cluster for ad-hoc computation
View dask_celery_scheduler.py
from __future__ import absolute_import, division, print_function
import multiprocessing
import pickle
from multiprocessing.pool import ThreadPool
from celery import shared_task
from dask.local import get_async # TODO: get better get
from dask.context import _globals
from dask.optimize import fuse, cull
@linar-jether
linar-jether / celery_task_monitor.py
Created Jun 22, 2017
Celery task monitor, logs task state to MongoDB
View celery_task_monitor.py
import pickle
import threading
from Queue import Queue
import time
from bson import InvalidDocument
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@linar-jether
linar-jether / simple_python_datasource.py
Last active May 22, 2022
Grafana python datasource - using pandas for timeseries and table data. inspired by and compatible with the simple json datasource ---- Up-to-date version maintained @ https://github.com/panodata/grafana-pandas-datasource
View simple_python_datasource.py
from flask import Flask, request, jsonify, json, abort
from flask_cors import CORS, cross_origin
import pandas as pd
app = Flask(__name__)
cors = CORS(app)
app.config['CORS_HEADERS'] = 'Content-Type'
@linar-jether
linar-jether / dynamic_tasks.py
Last active Mar 16, 2022
Dynamic celery tasks - remote execution of arbitrary callables and DAGs, using dill to serialize and send executable code to worker. This also shows a way to map an iterable returned from one task to a group of tasks (distributed map), with an optional reducer (chord) to be executed when the group tasks complete
View dynamic_tasks.py
# Task primitives, allows pipeline execution using celery
@app.task
def dmap(it, callback, final=None):
# Map a callback over an iterator and return as a group
callback = subtask(callback)
# Hack for mapping a chain to values, due to a bug where args are not copied in group creation
if isinstance(callback, chain):
if final:
raise ValueError('task_processor: Cannot run reducer for dmap excecuted with a chain.')
@linar-jether
linar-jether / app_engine_remote_api.py
Created Sep 10, 2015
Use App Engine services using the remote_api, works locally and in ipython shell
View app_engine_remote_api.py
import getpass
import sys
sys.path.insert(0,r"C:\Program Files (x86)\Google\google_appengine")
sys.path.insert(0,r"C:\Program Files (x86)\Google\google_appengine\lib\yaml\lib")
sys.path.insert(0,r"C:\Program Files (x86)\Google\google_appengine\lib\fancy_urllib")
from google.appengine.ext.remote_api import remote_api_stub
from google.appengine.ext import db