Skip to content

Instantly share code, notes, and snippets.

# USING PANDAS
import pandas as pd
import os
import logging
logging.basicConfig(level=logging.INFO)
current_dir = os.getcwd()
logging.info('Datasets will be read from current directory %s', current_dir)
from pyspark.sql.functions import *
from pyspark.sql import functions as f
from pyspark.sql import Window
dataframe =(spark.read.option("header","true").csv("/FileStore/sales_5000000.csv"))
#dataframe.show()
df = dataframe.drop("Country", "Sales Channel", "Order ID", 'Ship Date', 'Units Sold', 'Unit Price', 'Unit Cost', 'Total Cost', 'Total Profit' ,'Order Priority')\
.withColumn('Order Date', to_date(col('Order Date'),'M/dd/yyyy'))\
import boto3
import json
import random
import time
import uuid
from faker import Faker
from dataclasses import dataclass, field
s3_resource = boto3.resource('s3')
#s3_resource.create_bucket(Bucket='data-stream-dump',CreateBucketConfiguration={'LocationConstraint': 'eu-west-2'})
#APPLY MANIPULATION TO CREATE NEW DF
df_final = (df.withColumn("order_id", df["Order ID"]).drop("Order ID")
.withColumn("order_date", to_date(col("Order Date"), "M/d/yyyy")).drop("Order Date")
.withColumn("item_type", df["Item Type"]).drop("Item Type")
.withColumn("sales_channel", df["Sales Channel"]).drop("Sales Channel")
.withColumn("units_sold", df["Units Sold"].cast('float')).drop("Units Sold")
.withColumn("unit_price", df["Unit Price"].cast('float')).drop("Unit Price")
.withColumn("total_revenue", df["Total Revenue"].cast("float")).drop("Total Revenue")
.drop("Region", "Country", "Order Priority", "Ship Date", "Total Profit", "Total Cost", "Unit Cost")
.distinct()
#Example
# Input: nums = [-6, 1, 0, 3, 18]
# Output: [-1, 0, 9, 36, 324]
# Explanation: Only -6, 3 and 18 are divisible by 3 and will be squared.
# After squaring and sorting the list, it becomes [-1, 0, 9, 36, 324]
nums = [-6,-1,0,3,8,12]
def solution(nums):
return sorted([nums[i]**2 if nums[i]%3 ==0 else nums[i] for i in range(len(nums))])
%%time #outputs the execution time of the python statement
fx_revenues.to_sql('table_name', con = engine, if_exists = 'replace', index= False, schema = "schema_name")
connection.execute('grant select on schema_name.table_name to your_username;')
#### INPUT YOUR CREDENTIALS ###
DATABASE = "database_name"
USER = "your_user_name"
PASSWORD = "your_pw"
HOST = "company_name.eu-west-1.redshift.amazonaws.com"
PORT = "5439"
### CREATE A CONNECTION TO REDSHIFT DB
connection_string = "postgresql+psycopg2://%s:%s@%s:%s/%s" % (USER,PASSWORD,HOST,str(PORT),DATABASE)
engine = sa.create_engine(connection_string)
def gsheet2df(spreadsheet_name, sheet_num):
scope = ['https://spreadsheets.google.com/feeds','https://www.googleapis.com/auth/drive']
credentials_path = 'tutorial-sa-b04b423afd77.json'
credentials = sac.from_json_keyfile_name(credentials_path, scope)
client = gspread.authorize(credentials)
sheet = client.open(spreadsheet_name).get_worksheet(sheet_num).get_all_records()
df = pd.DataFrame.from_dict(sheet)
import gspread
from oauth2client.service_account import ServiceAccountCredentials as sac
import pandas as pd
import sqlalchemy as sa
$ git checkout master
Switched to branch ‘master’
$ git merge project1/add_commands
Updating b3088e3..cdb97d4
Fast-forward
git_commands.md | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)