This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package opa | |
import ( | |
"context" | |
"encoding/json" | |
"fmt" | |
"sync" | |
"github.com/open-policy-agent/opa/ast" | |
"github.com/open-policy-agent/opa/rego" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
func compareToPrevRunNodes( | |
vaccinationsData map[string]api.VaccinationStatus, | |
currentRun string, | |
prevRunNodes map[string]interface{}, | |
) (map[string][]interface{}, error) { | |
var vaccinationNodes = map[string][]interface{}{ | |
"new": {}, | |
"old": {}, | |
"changed": {}, | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
type ArangoDB struct { | |
db driver.Database | |
} | |
type VaccinationEdge struct { | |
From string `json:"_from"` | |
To string `json:"_to"` | |
Collection string `json:"collection"` | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package api | |
import ( | |
"encoding/json" | |
"fmt" | |
"net/http" | |
"time" | |
) | |
type VaccinationStatusEntity struct { |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE EXTENSION IF NOT EXISTS aws_lambda CASCADE; | |
CREATE OR REPLACE FUNCTION respond_with_lambda() | |
RETURNS TRIGGER | |
LANGUAGE PLPGSQL | |
AS | |
$$ | |
BEGIN | |
IF cardinality(TG_ARGV)!=2 THEN | |
RAISE EXCEPTION 'Expected 2 parameters to respond_with_lambda function but got %', cardinality(TG_ARGV); | |
ELSEIF TG_ARGV[0]='' THEN |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Built-ins | |
from os import getenv | |
from json import dumps | |
from typing import Any, Dict | |
from http import HTTPStatus | |
# Third party | |
from aws_lambda_powertools.utilities.typing import LambdaContext | |
from aws_lambda_powertools.utilities.parser import event_parser, BaseModel | |
import boto3 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def _proxy_http_request_kubernetes_service(service: str, port: int, path: str, namespace: str, | |
method: str, headers: dict, | |
body: Any, timeout: int) -> HTTPResponse: | |
api_client = ApiClient() | |
full_path = f"/api/v1/namespaces/{namespace}/services/{service}:{port}/proxy/{path}" | |
logger.info(f"Sending {method} request to {full_path} with body {body}") | |
response: HTTPResponse = api_client.call_api( | |
resource_path=full_path, | |
method=method, | |
header_params={"Accept": "*/*"}.update(headers), |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def _authenticate_to_eks_cluster(cluster_endpoint: str, ca_cert_path: str, sts_token: str) -> None: | |
configuration = kube_client.Configuration() | |
configuration.host = cluster_endpoint | |
configuration.verify_ssl = True | |
configuration.debug = False | |
configuration.ssl_ca_cert = ca_cert_path | |
configuration.api_key = {"authorization": "Bearer " + sts_token} | |
kube_client.Configuration.set_default(configuration) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
EKS_CLUSTER_NAME_PARAMETER = 'x-k8s-aws-id' | |
EKS_TOKEN_PREFIX = 'k8s-aws-v1.' | |
STS_URL = 'https://sts.{}.amazonaws.com/?Action=GetCallerIdentity&Version=2011-06-15' | |
DEFAULT_REGION = 'eu-central-1' | |
def _get_bearer_token(cluster_name: str, region: str = DEFAULT_REGION) -> str: | |
session = Session() | |
client = session.client('sts', region_name=region) | |
print(f"Lambda's AWS identity: {client.get_caller_identity()}") |