Skip to content

Instantly share code, notes, and snippets.

View anandtripathi5's full-sized avatar
🎯
Focusing

Anand Tripathi anandtripathi5

🎯
Focusing
View GitHub Profile
@anandtripathi5
anandtripathi5 / 12_factor_app_checklist_explained.md
Last active December 3, 2022 13:24
The 12 Factor App checklist explained

The twelve-factor app Checklist Explained

| ✓ | Factors | Status | Remarks | |----|-----------------------------------------------

@anandtripathi5
anandtripathi5 / pyspark_to_elasticsearch.py
Created September 2, 2022 14:59 — forked from adrianva/pyspark_to_elasticsearch.py
Save RDD and/or DataFrame from Spark to Elasticsearch
# Elastic configs
es_read_conf = {
"es.nodes" : "localhost",
"es.port" : "9200",
"es.resource" : "twitter/tweet"
}
es_write_conf = {
"es.nodes" : "localhost",
"es.port" : "9200",
from locust import HttpUser, task, between
from random import randint
class AwesomeApplication(HttpUser):
wait_time = between(1, 5)
@task
def hello(self):
self.client.get("/hello")
from flask import Flask, request, jsonify
from flask_jwt_extended import create_access_token
from flask_jwt_extended import get_jwt_identity
from flask_jwt_extended import jwt_required
from flask_jwt_extended import JWTManager
app = Flask('__name__')
# Setup the Flask-JWT-Extended extension
app.config["JWT_SECRET_KEY"] = "super-secret" # Change this!
jwt = JWTManager(app)
from locust import HttpUser, task
from random import randint
class AwesomeApplication(HttpUser):
@task
def hello(self):
self.client.get("/hello")
@task
def world(self):
@anandtripathi5
anandtripathi5 / flask-2-0.py
Last active April 1, 2022 11:20
This is a minimal example of Flask 2.0
from flask import Flask
app = Flask('__name__')
@app.get('/hello')
@app.post('/hello')
def hello():
return 'This is the awesome hello api request'
from flask import Flask
from extensions import socketio
def create_app():
app = Flask(__name__)
# other initializations and configurations
...
import celery as celery
from extensions import socketio
@celery.task(name="some_task", bind=True)
def some_task(self, room_name=None, namespace=None):
# Do some task
# Send a event to room
socketio.emit(
from flask_socketio import emit, join_room
from extensions import db
from base import BaseSocket
class Foo(BaseSocket):
def on_get_foo_details(self, **kwargs):
try:
from extensions import socketio
from resources.foo import Foo
def socket_init():
socketio.on_namespace(Foo('/foo'))