Skip to content

Instantly share code, notes, and snippets.

View Ananto30's full-sized avatar
🎯
Focusing

Azizul Haque Ananto Ananto30

🎯
Focusing
View GitHub Profile
@Ananto30
Ananto30 / python_redis_pubsub.py
Created January 27, 2019 18:26
An example of Redis pub/sub in Python using asyncio_redis and async python programming
import asyncio_redis
import asyncio
channels = ['channel1', 'weather']
async def start():
connection = await asyncio_redis.Connection.create(host='127.0.0.1', port=6379)
subscriber = await connection.start_subscribe()
await subscriber.subscribe(channels)
@Ananto30
Ananto30 / sse.go
Last active May 11, 2024 08:44
SSE message stream in Go
// Example SSE server in Golang.
// $ go run sse.go
// Inspired from https://gist.github.com/ismasan/3fb75381cd2deb6bfa9c
package main
import (
"encoding/json"
"fmt"
"log"
@Aspect
@Slf4j
public class LoggerAspect {
@Around("@annotation(Loggable)")
public Object logAround(ProceedingJoinPoint joinPoint) throws Throwable {
long start = System.currentTimeMillis();
var result = joinPoint.proceed();
public abstract class AbstractWebClient {
private static final String MIME_TYPE = "application/json";
private final WebClient webClient;
public AbstractWebClient(String clientUrl) {
this.webClient = WebClient.builder()
.baseUrl(clientUrl)
.defaultHeader(HttpHeaders.CONTENT_TYPE, MIME_TYPE)
.build();
@Component
@Slf4j
public class TraceIdFilter implements WebFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
Map<String, String> headers = exchange.getRequest().getHeaders().toSingleValueMap();
return chain.filter(exchange)
.subscriberContext(context -> {
@Component
@Slf4j
public class TraceIdFilter implements WebFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
Map<String, String> headers = exchange.getRequest().getHeaders().toSingleValueMap();
var traceId = "";
if (headers.containsKey("X-B3-TRACEID")) {
@Ananto30
Ananto30 / python_singleton.py
Created March 22, 2020 12:30
Singleton implementation in Python
class SingletonMeta(type):
_instance = None
def __call__(self):
if self._instance is None:
self._instance = super().__call__()
return self._instance
import msgpack
class Btypes:
def pack(self):
return msgpack.packb(Btypes.get_all_vars(self))
@classmethod
def unpack(cls, d):
return cls(**msgpack.unpackb(d, raw=False))
from app.serializers.serialization_types import Btypes
class OrderStatus:
INITIATED = 0
PACKING = 1
SHIPPED = 2
DELIVERED = 3
from ..serialization_types import Etypes, type_check
from .serializer import CreateOrderReq
class OrderOperations:
CREATE_ORDER = 'create_order'
GET_ORDER = 'get_order'
class CreateOrder(Etypes):