Skip to content

Instantly share code, notes, and snippets.

@yogesh-aggarwal
Last active October 21, 2019 07:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yogesh-aggarwal/40dcd3650e7bb5851f1dd100b790c6e7 to your computer and use it in GitHub Desktop.
Save yogesh-aggarwal/40dcd3650e7bb5851f1dd100b790c6e7 to your computer and use it in GitHub Desktop.
"""
CMS: Content Management System
Manages content specifically for a blogging website which have many posts & requires to update its content on daily basis.
This project is based on functional approach & specifically designed for processing huge amount of data.
The project supports all types of CRUD operations.
-- Note: Designed for School project --
Name: Yogesh
Class: XII-B
Roll. no.: 34
"""
from datetime import datetime
from os import getpid as pid, remove
from webbrowser import open_new_tab as web
from hashlib import sha512
from colorama import init
init()
default = "data.msp"
schemaSeperate = "^" * 100
dataSeperate = "%" * 100
warning = True
datab = input(f"Database ({default}): ")
database = datab if datab else default
def sPrint(s, fg, bg):
"""
For special printing purposes.
"""
print(f"\033[1;{fg};{bg}m{s}\033[0m")
def create(flag="", execUser=True):
"""
Creates entry in database.
"""
try:
data = fetchData()
except Exception:
pass
while True:
try:
newData = []
title = input(f"Title: ").strip()
newData.append(title if title else "Title")
content = input(f"Content: ").strip()
newData.append(content if content else "Content")
date = input(f"Date: ").strip()
newData.append(date if date else "Date")
author = input(f"Author: ").strip()
newData.append(author if author else "Author")
data.append(newData)
with open(database) as f:
readData = f.readlines()[
schemaSetup(updateTime=True)[0] : schemaSetup(updateTime=True)[1]
+ 1
]
final = []
for x in data:
new = [f"{x[0]}\n", f"{x[1]}\n", f"{x[2]}\n", f"{x[3]}\n"]
final.extend(new)
final.append(f"{dataSeperate}\n")
final.insert(0, f"{dataSeperate}\n")
final = readData + final
with open(database, "w+") as f:
[f.write(x) for x in final]
print(f"Successfully created the record! (was {pid()})")
break
except KeyboardInterrupt:
print(f"\nOperation cancelled (was {pid()})")
break
except Exception:
print("Invalid input!")
continue
def show(block=True, flag="", execUser=True):
"""
Displays the existing enries in the database
"""
data = fetchData()
if len(data) != 0:
_count = 0
for i, x in enumerate(data):
if _count != flag:
print(
f"|</>| POST {1+i} |</>|\n\tTitle: {x[0]}\n\tContent: {x[1]}\n\tDate published: {x[2]}\n\tAuthor: {x[3]}"
)
_count += 1
else:
_count = 1
try:
cont = input("-- More -- ")
if cont == "":
continue
else:
break
except KeyboardInterrupt:
break
else:
print("No posts to show!")
main() if block else False
def view(flag="", execUser=True):
"""
Creates a post view for an existing post (provided the index).
"""
data = fetchData()
if len(data) != 0:
try:
if execUser and flag <= len(data):
post = data[flag - 1]
print(
f"</>|Post View|</>\n\tTitle: {post[0]}\n\tContent: {post[1]}\n\tDate: {post[2]}\n\tAuthor: {post[2]}"
)
else:
show(flag=flag)
except Exception:
print("No posts to show")
else:
print("No posts to show!")
def update(flag="", execUser=True):
"""
Updates the value for a post in the database.
"""
try:
data = fetchData()
if len(data) != 0:
if not execUser:
show(block=False, flag=20)
except Exception:
pass
while True:
try:
if len(data) != 0:
index = flag
if not flag or flag > len(data):
index = int(input("\nPost number (Ctrl+C to cancel): "))
post = data[index - 1]
newData = []
title = input(f"Title ({post[0]}): ")
newData.append(title if title else post[0])
content = input(f"Content ({post[1]}): ")
newData.append(content if content else post[1])
date = input(f"Date ({post[2]}): ")
newData.append(date if date else post[2])
author = input(f"Author ({post[3]}): ")
newData.append(author if author else post[3])
if newData == post:
print("No data to be updated!\n")
else:
data[index - 1] = newData
with open(database) as f:
readData = f.readlines()[
schemaSetup(updateTime=True)[0] : schemaSetup(
updateTime=True
)[1]
+ 1
]
final = []
for x in data:
new = [f"{x[0]}\n", f"{x[1]}\n", f"{x[2]}\n", f"{x[3]}\n"]
final.extend(new)
final.append(f"{dataSeperate}\n")
final.insert(0, f"{dataSeperate}\n")
final = readData + final
with open(database, "w+") as f:
[f.write(x) for x in final]
print(f"Successfully updated the record! (was {pid()})")
else:
print("No posts to update!")
break
except KeyboardInterrupt:
print(f"\nOperation cancelled (was {pid()})")
break
except Exception:
print("Invalid input!")
break
# continue
def delete(flag="", execUser=True):
"""
Deletes entry in database
"""
try:
data = fetchData()
if len(data) != 0:
if not execUser:
show(block=False, flag=20)
except Exception:
pass
while True:
try:
if len(data) != 0:
index = flag
if not flag or flag > len(data):
index = int(input("\nPost number (Ctrl+C to cancel): "))
data.pop(index - 1)
confirm = input("Are you sure? (Y/n): ").lower()
if "y" in confirm:
with open(database) as f:
readData = f.readlines()[
schemaSetup(updateTime=True)[0] : schemaSetup(
updateTime=True
)[1]
+ 1
]
final = []
for x in data:
new = [f"{x[0]}\n", f"{x[1]}\n", f"{x[2]}\n", f"{x[3]}\n"]
final.extend(new)
final.append(f"{dataSeperate}\n")
final.insert(0, f"{dataSeperate}\n")
final = readData + final
with open(database, "w+") as f:
[f.write(x) for x in final]
print("Successfully deleted the record!")
else:
print(f"Delete operation cancelled (was {pid()})")
else:
print("No posts to delete!")
break
except KeyboardInterrupt:
print(f"\nOperation cancelled (was {pid()})")
break
except Exception:
print("Invalid input!")
continue
def count(flag="", execUser=True):
"""
Counts the no. of posts in the database.
"""
print(f"There are {len(fetchData())} posts in the database\n")
def export(flag="", execUser=True):
"""
Helps in exporting post to different format.
"""
def setupData(extension):
"""
Some work related to file name setup
"""
try:
data = fetchData()
if len(data) != 0:
if not execUser:
show(block=False, flag=20)
except Exception:
pass
while True:
try:
index = int(input("\nPost possition (Ctrl+C to cancel): "))
if not index:
continue
post = data[index - 1]
break
except KeyboardInterrupt:
print(f"\nOperation cancelled (was {pid()})")
break
except Exception:
print("Invalid input!")
while True:
try:
file = input("File name: ")
if not file:
continue
break
except KeyboardInterrupt:
print(f"\nOperation cancelled (was {pid()})")
break
except Exception:
print("Invalid input!")
while True:
try:
if not file.endswith(extension):
file += extension
try:
try:
with open(file):
pass
raise FileExistsError
except FileExistsError:
raise FileExistsError
except Exception:
return file, post
except FileExistsError:
choice = input("File already exists, overite (Y/n): ")
if "y" in choice:
remove(file)
continue
break
except KeyboardInterrupt:
print(f"\nOperation cancelled (was {pid()})")
break
except Exception:
print("Invalid input!")
def json():
"""
Exports to JSON.
"""
file, post = setupData(".json")
data = dict(zip(["title", "content", "date", "author"], post))
final = []
for x in data:
final.append(f'"{x}": "{data[x]}"')
data = "{" + ", ".join(final) + "}"
with open(file, "w") as f:
f.write(data)
print(f"Data was successfully written to {file} (was {pid()})")
def html():
"""
Exports to HTML.
"""
file, post = setupData(".html")
data = f"""<html><head><title>{post[0]}</title><style>* {'{font-family: "Verdana", sans-serif}'} h5 {'{text-decoration: underline}'}</style></head><body><h1>{post[0]}</h1><p>{post[1]}</p><h5>Date created: {post[2]}</h5><h5>By {post[3]}</h5></body></html>"""
with open(file, "w") as f:
f.write(data)
web(file)
print(f"Data was successfully written to {file} (was {pid()})")
def text():
"""
Exports to a plain text file.
"""
file, post = setupData(".txt")
data = f"Title: {post[0]}\nContent: {post[1]}\nDate: {post[2]}\nAuthor: {post[3]}\n"
with open(file, "w") as f:
f.write(data)
print(f"Data was successfully written to {file} (was {pid()})")
if len(fetchData()) != 0:
print("Available formats are:\n\t1) HTML\n\t2) JSON\n\t3) Text\n")
while True:
try:
exportType = input("Type (position) (Ctrl+C to cancel): ")
if not exportType:
continue
{"1": html, "2": json, "3": text}[exportType]()
break
except KeyboardInterrupt:
print(f"\nOperation cancelled (was {pid()})")
break
except Exception:
print("Invalid input!")
else:
print("No data to export...")
def cmsQuit(flag="", execUser=True):
"""
Halts the program with quit code 0.
"""
print("\n\nBye! See you again...")
quit(0)
def schemaSetup(databPath=database, updateTime=False):
"""
Sets up the schema for the database.
"""
if not updateTime:
sPrint(f"Setting up schema...\n", 34, 40)
with open(databPath, "a+") as db:
content = [
f"{schemaSeperate}\n",
f"Time created: {datetime.now()}\n",
f"Last login: {datetime.now()}\n",
f"{schemaSeperate}\n",
]
while True:
try:
choice = input("Protect? (y/N): ")
if "y" in choice:
password = input("Password: ")
if not password:
print(
f"No password was assigned to the database ({database}) (was {pid()})"
)
else:
salt = sha512(password.encode("utf-8")).hexdigest()
content.insert(3, f"{password};{salt}\n")
print(
f"Password was assigned to the database ({database}) (was {pid()})"
)
sPrint(f"Hash: {salt}\n", 37, 45)
break
except KeyboardInterrupt:
print()
break
except Exception:
pass
db.writelines(content)
else:
with open(databPath) as db:
data = db.readlines()
data = list(map(lambda x: x.replace("\n", ""), data))
if schemaSeperate not in data:
schemaSetup()
else:
lower = data.index(schemaSeperate)
upper = data.index(schemaSeperate, lower + 1)
if upper - lower == 3:
upper += 1
data[lower + 2 : upper - 1] = [f"Last login: {datetime.now()}"]
with open(databPath, "w+") as db:
for line in data:
db.write(f"{line}\n")
return (lower, upper)
def fetchData(databPath=database, askPassword=False):
"""
Fetchs the data from the database & returns it.
"""
try:
with open(databPath, "r") as db:
dbRead = db.read()
if schemaSeperate not in dbRead:
schemaSetup()
else:
schemaSetup(updateTime=True)
readData = dbRead.split("\n")
readData = list(map(lambda x: x.replace("\n", ""), readData))
limits = schemaSetup(updateTime=True)
data = readData.copy()
password = ""
salt = ""
try:
data = data[limits[0] + 3 : limits[1]][0].split(";")
if len(data) == 1:
if data[0] == schemaSeperate:
data[0] = ""
data = [data[0], ""]
password, salt = data
except Exception:
pass
if (not sha512(password.encode("utf-8")).hexdigest() == salt) and (
salt != password
):
if warning:
sPrint(
'\nDeprecate Warning: Database manually modified! Don\'t do that, it can cause failure of data! Run "dw" command to disable the warning for current session',
37,
41,
)
while True:
pInp = ""
if password:
pInp = input("\nPassword: ") if askPassword else password
if pInp == password:
final = []
upper = 0
lower = 0
for i in readData:
if i == dataSeperate:
lower = upper
upper = readData.index(i, lower + 1)
final.append(readData[lower + 1 : upper])
if schemaSeperate not in dbRead:
fetchData()
if final:
final.pop(0)
try:
final.remove([])
except Exception:
pass
return final
else:
print("Invalid password!")
except FileNotFoundError:
with open(database, "w+"):
pass
schemaSetup()
except Exception as e:
raise e
return []
def showCommands(greet=True):
"""
Prints the available commands to the console.
"""
keywords = [
"\nWelcome to CMS: Content Management System",
"Here're some commands you can operate:\n",
]
commands = [
"create (c) : Create post",
"show (r) : Show all posts",
"update <?index> (u) : Update post",
"delete <?index> (d) : Delete post",
"view <index> (v) : View post",
"count (cp) : Count no. of posts",
"export (e) : Export post",
"quit (q) : Quit CMS",
"help (h) : Help",
"about (a) : About CMS",
]
final = commands.copy()
final = keywords + final if greet else final
[print(x) for x in final]
def disbleWarning(flag="", execUser=True):
"""
Disables warnings for the session.
"""
global warning
warning = False
print("Warnings are now disabled for this session!")
def about(flag="", execUser=True):
"""
Displays information about CMS.
"""
print(
"""
CMS: Content Management System
Manages content specifically for a blogging website which have many posts & requires to update its content on daily basis.
This project is based on functional approach & specifically designed for processing huge amount of data.
The project supports all types of CRUD operations.
-- Note: Designed for School project --
Name: Yogesh
Class: XII-B
Roll. no.: 34
"""
)
def help(flag="", execUser=True):
"""
Displays commands.
"""
print("Here're some commands you can operate:\n")
showCommands(greet=False)
keyMap = {
# Create
"c": create,
"create": create,
# Show
"r": show,
"show": show,
# View
"v": view,
"view": view,
# Update
"u": update,
"update": update,
# Delete
"d": delete,
"delete": delete,
# Count entries
"cp": count,
"count": count,
# Quit
"q": cmsQuit,
"quit": cmsQuit,
# Export
"e": export,
"export": export,
# About
"a": about,
"about": about,
# Warning
"dw": disbleWarning,
# Help
"h": help,
"help": help,
}
def main():
"""
Takes the command & redirects to different functions.
"""
while True:
try:
command = input("\n> ").lower()
if not command:
continue
execUser = True
words = command.split(" ")
flag = ""
try:
command = words[0]
flag = int(words[1])
except Exception:
pass
execUser = False if not flag else True
flag = 15 if not flag else flag
try:
keyMap[command](flag=flag, execUser=execUser)
except Exception:
print("Invalid command!")
except KeyboardInterrupt:
cmsQuit()
if __name__ == "__main__":
"""
Main block of program.
"""
sPrint(f'\nUsing "{database}" as database', 34, 40)
fetchData(askPassword=True)
showCommands()
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment