Skip to content

Instantly share code, notes, and snippets.

View gwbischof's full-sized avatar
🧐
Learning

Garrett Bischof gwbischof

🧐
Learning
View GitHub Profile
@gwbischof
gwbischof / MongoBox.ipynb
Created November 9, 2018 22:05
Really easy way to get a mongodb running, with demonstration of a few PyMongo read/write methods.
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
class DocumentBuffer():
# Insert should block if buffer is full
from collections import defaultdict
from threading import Lock
def __init__(self):
self.empty = True
self.AppendBuffer = defaultdict(list)
self.EventBuffer = defaultdict(lambda : defaultdict( lambda : defaultdict(dict)))
from pymongo import InsertOne, DeleteOne, ReplaceOne
from pymongo.errors import BulkWriteError
from pymongo import MongoClient
client = MongoClient('localhost', 27017)
update_test = client['UpdateOne']
table = update_test.table
# drops the table to start fresh
def drop_testdb():
client.drop_database(update_test)
@gwbischof
gwbischof / DocBuffer.py
Created March 28, 2019 14:25
DocBuffer
class DocBuffer():
"""
DocBuffer is a thread-safe "embedding" buffer for bluesky event or datum
documents.
"embedding" refers to combining multiple documents from a stream of
documents into a single document, where the values of matching keys are
stored as a list, or dictionary of lists. "embedding" converts event docs
to event_pages, or datum doc to datum_pages. event_pages and datum_pages
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
import inputs
import time
import threading
class GamePad:
def __init__(self, ldms_button='BTN_TL',rdms_button='BTN_TR',
activate_button='BTN_EAST', status_pv=None):
self._ldms_button = ldms_button
@gwbischof
gwbischof / pdf_gamepad.py
Created June 21, 2019 21:31
pdf gamepad
import inputs
import time
import threading
from epics import caget, caput, cainfo
class GamePad:
def __init__(self, ldms_button='BTN_TL',rdms_button='BTN_TR',
activate_button='BTN_START', status_pv=None):
import itertools
def page_chunks(page, chunk_size, remainder):
array_keys = ['seq_num', 'time', 'uid']
page_size = len(page['uid'])
chunks = [(0,remainder)]
chunks.extend([(i, i + chunk_size) for i
in range(remainder, page_size, chunk_size)])
for start, stop in chunks:
yield {'descriptor': page['descriptor'],
@gwbischof
gwbischof / merge.py
Last active June 27, 2019 18:01
merge xarrays
import xarray
import string
import random
from functools import partial
def xevents(numcol, numrows):
stream_key = 'descriptor'
coord_key = 'time'
array_keys = ['seq_num', 'time', 'uid']
dataframe_keys = ['data', 'timestamps', 'filled']