This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import pytest | |
from lambda_function import * | |
class MockBotoSession: | |
class Session: | |
def client(self, service_name, region_name): | |
# create a flexible Client class for any boto3 client | |
class Client: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import json | |
import pytest | |
from moto import mock_dynamodb2 | |
from lambda_function import * | |
@pytest.fixture | |
def test_sqs_event(): | |
test_sqs_event = { |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import imaplib | |
from bs4 import BeautifulSoup | |
from collections import defaultdict | |
def run(): | |
imap_url = 'imap.gmail.com' | |
con = imaplib.IMAP4_SSL(imap_url) | |
con.login('youremail@gmail.com','exampleapppassword') | |
con.select('"INBOX"') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
log_group = f'/aws/lambda/{lambda_function_name}' | |
query = "fields @requestId | " \ | |
"sort @timestamp desc | limit 3 | " \ | |
"filter @message like 'ERROR'" | |
time.sleep(180) # wait for logs to be available. | |
client = boto3.client('logs') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
query = f"fields @timestamp, @message | " \ | |
"sort @timestamp asc | limit 25 | " \ | |
"filter @requestId = '{request_id}'" | |
client = boto3.client('logs') | |
start_query_response = client.start_query( | |
logGroupName=log_group, | |
startTime=int((datetime.now() - timedelta(minutes=15)).timestamp()), | |
endTime=int(datetime.now().timestamp()), |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import requests | |
df = pd.read_csv('promos.csv') | |
for row in df.items(): | |
user_id = row['user_id'] | |
url = f'http://internal.services.com/promotions?user_id={user_id}' | |
r = requests.post(url) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import requests | |
df = pd.read_csv('promos.csv') | |
for i, row in df.iterrows(): | |
if i % 100 == 0: | |
print(f'Working on row {i}') | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import requests | |
import time | |
df = pd.read_csv('promos.csv') | |
for i, row in df.iterrows(): | |
time.sleep(1) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import requests | |
error_list = [] | |
df = pd.read_csv('promos.csv') | |
for i, row in df.iterrows(): | |
if i % 100 == 0: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import boto3 | |
df = pd.read_csv(file_path) | |
dynamo = boto3.resource('dynamodb') | |
table = dynamo.Table('data-services-dev-testing') | |
with table.batch_writer() as bw: | |
for i, record in enumerate(df.to_dict("records")): |