Skip to content

Instantly share code, notes, and snippets.

@KhaledBinAmir
Last active August 13, 2023 08:24
Show Gist options
  • Save KhaledBinAmir/091f6d431d935149451a8f6471e3e50e to your computer and use it in GitHub Desktop.
Save KhaledBinAmir/091f6d431d935149451a8f6471e3e50e to your computer and use it in GitHub Desktop.
Telegram Webhook Handler for Employee Self Service using Server Script of ERPNext Frappe Framework
try:
request_source = frappe.form_dict.request_source
message = frappe.form_dict.message
callback_query = frappe.form_dict.callback_query
if request_source == "Employee List":
docs = json.loads(frappe.form_dict.employees)
for idx, d in enumerate(docs) :
employee = frappe.db.get_value('Employee', d, ['cell_number', 'telegram_chat_id', 'employee_name', 'telegram_secret'], as_dict=1)
if not employee.telegram_chat_id and employee.cell_number :
telegram_secret = employee.telegram_secret
if not telegram_secret :
telegram_secret = 'MKTS_' +frappe.utils.generate_hash(length=20)
frappe.db.set_value('Employee', d, 'telegram_secret', telegram_secret)
message = 'Hi '+employee.employee_name+',\nplease link your Telegram to your Employee Profile via this url (make sure Telegram app is set up first):\n\nhttps://t.me/MK_Electronics_Bot?start='+telegram_secret
frappe.call('frappe.core.doctype.sms_settings.sms_settings.send_sms', receiver_list=[employee.cell_number], msg=message,success_msg=False)
frappe.response['message'] = "Sent to non telegram enlisted employees"
elif message or callback_query:
chat = message['chat'] if message else callback_query['message']['chat']
if chat:
chat_type = chat['type']
chat_id = chat['id']
if chat_type == 'private' :
chat_text = message['text'] if message else None
callback_data = callback_query['data'] if callback_query else None
message_id = callback_query['message']['message_id'] if callback_query else None
if chat_text == "/employee" or callback_data == "employee":
chat_employee = frappe.db.get_value('Employee', {'telegram_chat_id': chat_id}, ['name', 'employee_name'], as_dict=True)
if chat_employee :
reply_markup = {
"inline_keyboard": [
[{"text": "Attendance Report", "callback_data": "emp::attendance_report"}],
[{"text": "Leave Balance", "callback_data": "emp::leave_balance"}],
[{"text": "Salary Slip", "callback_data": "emp::salary"}],
[{"text": "Profile", "callback_data": "emp::profile"}]
]}
reply_message = 'Please select employee services'
frappe.call('mke.custom_api.send_telegram_message', reciepient_chat_ids=[chat_id], caption=reply_message, reply_markup=reply_markup, message_id = message_id if callback_data else None)
else :
reply_message = 'This telegram account is not linked with any employee profile.\n\nFor assistance, please contact HR.'
frappe.call('mke.custom_api.send_telegram_message', reciepient_chat_ids=[chat_id], caption=reply_message)
if message :
if 'MKTS_' in chat_text:
#frappe.set_user("Administrator")
secret_key = [chat_text[i:i+25] for i in range(len(chat_text)) if chat_text[i:i+5] == 'MKTS_' and chat_text[i+5:i+25].isalnum()][0] if [chat_text[i:i+25] for i in range(len(chat_text)) if chat_text[i:i+5] == 'MKTS_' and chat_text[i+5:i+25].isalnum()] else None
if secret_key :
matched_employee = frappe.db.get_value('Employee', {'telegram_secret': secret_key}, ['name', 'employee_name', 'telegram_chat_id'], as_dict=True)
if matched_employee :
if matched_employee.telegram_chat_id :
which_account = 'another Telegram account'
if matched_employee.telegram_chat_id == str(chat_id) :
which_account = 'this Telegram account'
reply_message = 'Dear ' + matched_employee.employee_name + ',\n' + which_account + ' is already assigned for your employee notifications.\n\nFor assistance, please contact HR.'
frappe.call('mke.custom_api.send_telegram_message', reciepient_chat_ids=[chat_id], caption=reply_message)
else :
chat_id_exist = frappe.db.get_value('Employee', {"telegram_chat_id": chat_id}, ['name', 'employee_name', 'telegram_chat_id'], as_dict=True)
if chat_id_exist :
reply_message = 'This telegram account is already linked with employee profile of '+chat_id_exist.name+' : '+chat_id_exist.employee_name+' for notifications.\n\nFor assistance, please contact HR.'
frappe.call('mke.custom_api.send_telegram_message', reciepient_chat_ids=[chat_id], caption=reply_message)
else:
frappe.db.set_value('Employee', matched_employee.name, {'telegram_chat_id': chat_id, 'telegram_secret': None})
reply_message = "Dear " + matched_employee.employee_name + ",\nYour Telegram account has been successfully linked to your Employee profile. You'll receive notifications here."
frappe.call('mke.custom_api.send_telegram_message', reciepient_chat_ids=[chat_id], caption=reply_message)
else:
reply_message = 'Hello ' + message['from']['first_name'] + ',\nYour enrollment url appears to be invalid.\n\nPlease reach out to HR for assistance.'
frappe.call('mke.custom_api.send_telegram_message', reciepient_chat_ids=[chat_id], caption=reply_message)
elif callback_query:
if callback_data.startswith('doc::') :
doc_ref = callback_data.split('::')
doctype_name = doc_ref[1]
doc_name = doc_ref[2]
print_format = doc_ref[3]
reply_message = doctype_name +' : '+doc_name
frappe.call('mke.custom_api.enqueue_telegram_send_doc_pdf', reciepient_chat_ids=[chat_id], caption=reply_message, doctype=doctype_name, doc_name=doc_name, print_format=print_format, print_letterhead=True, enqueue_after_commit=False, message_id=message_id)
frappe.call('mke.custom_api.send_telegram_message', reciepient_chat_ids=[chat_id], caption='Please wait for PDF of:\n'+reply_message, message_id=message_id)
elif callback_data.startswith("emp::"):
chat_employee = frappe.db.get_value('Employee', {'telegram_chat_id': chat_id}, ['name', 'employee_name','company', 'designation', 'department'], as_dict=True)
if chat_employee :
if callback_data == "emp::attendance_report":
reply_markup = {
"inline_keyboard": [
[{"text": "This Month's Attendance", "callback_data": "emp::this_months_attendance"}],
[{"text": "Last Month's Attendance", "callback_data": "emp::last_months_attendance"}],
[{"text": "« Main Menu", "callback_data": "employee"}]
]}
reply_message = 'Select Report Period'
frappe.call('mke.custom_api.send_telegram_message', reciepient_chat_ids=[chat_id], caption=reply_message, reply_markup=reply_markup, message_id=message_id)
elif callback_data == "emp::this_months_attendance" or callback_data == "emp::last_months_attendance":
month_date = frappe.utils.getdate() if callback_data == "emp::this_months_attendance" else frappe.utils.add_to_date(frappe.utils.getdate(), months=-1)
date_from = frappe.utils.get_first_day(month_date)
date_to = frappe.utils.get_last_day(month_date)
attendance_report = frappe.call("frappe.desk.query_report.run", report_name="Attendance Report", filters={"employee":chat_employee.name, "date_from":date_from, "date_to":date_to})
report_formatted = ['<b>Attendance Report</b>']
for row in attendance_report['report_summary'] :
report_formatted.append(row['label']+' : '+str(row['value']))
report_formatted.append('--------')
for row in attendance_report['result'] :
date = frappe.utils.format_date(row['attendance_date'], 'dd MMM')
status = row['status']
#in_out = ((row['in_time'] if row['in_time'] else '--') +' → ' + (row['out_time'] if row['out_time'] else '--')) if row['out_time'] and row['in_time'] else None
late_early = ' '.join(x for x in ['LE' if row['late_entry'] else None, 'EE' if row['early_exit'] else None] if x)
report_formatted.append(date+ ((' | '+status) if status else '')+((' | '+late_early) if late_early else ''))
reply_message = '\n'.join(report_formatted)
frappe.call('mke.custom_api.send_telegram_message', reciepient_chat_ids=[chat_id], caption=reply_message, message_id=message_id)
elif callback_data == "emp::leave_balance":
today = frappe.utils.getdate()
date_from = frappe.utils.get_year_start(today)
date_to = today
attendance_report = frappe.call("frappe.desk.query_report.run", report_name="Employee Leave Balance", filters={"employee":chat_employee.name, "from_date":date_from, "to_date":date_to, "company":chat_employee.company})
report_formatted = ['<b>Leave Balance</b>', '--------', '']
for row in attendance_report['result'] :
if row['leaves_allocated'] or row['leaves_taken'] :
report_formatted.append('<b>'+row['leave_type']+' :</b> '+str(row['closing_balance']))
reply_message = '\n'.join(report_formatted)
frappe.call('mke.custom_api.send_telegram_message', reciepient_chat_ids=[chat_id], caption=reply_message, message_id=message_id)
elif callback_data == "emp::profile":
reply_message = "Employee Name: "+chat_employee.employee_name+"\nJoining Date: 13-08-2023\nDesignation: "+chat_employee.designation+"\nDepartment: "+chat_employee.department
frappe.call('mke.custom_api.send_telegram_message', reciepient_chat_ids=[chat_id], caption=reply_message, message_id=message_id)
elif callback_data == "emp::salary":
salary_slips = frappe.db.get_list("Salary Slip", filters={"employee" : chat_employee.name},fields=["name", "start_date"], order_by='start_date desc', page_length=5)
if len(salary_slips) > 0:
keyboard_data = []
for ss in salary_slips :
ss_keyboard = [{"text": frappe.utils.format_date(ss.start_date, 'MMMM yyyy'), "callback_data": "doc::Salary Slip::"+ss.name+"::Salary Slip Standard"}]
keyboard_data.append(ss_keyboard)
reply_markup = {
"inline_keyboard": keyboard_data
}
reply_message = 'Select your salary slip'
frappe.call('mke.custom_api.send_telegram_message', reciepient_chat_ids=[chat_id], caption=reply_message, reply_markup=reply_markup, message_id=message_id)
else:
reply_message = 'No Salary Slip is found for employee '+chat_employee.name+ ' : '+chat_employee.employee_name+'\n\nContact HR for further assistance.'
frappe.call('mke.custom_api.send_telegram_message', reciepient_chat_ids=[chat_id], caption=reply_message, message_id=message_id)
else:
reply_message = 'This service is not available yet'
frappe.call('mke.custom_api.send_telegram_message', reciepient_chat_ids=[chat_id], caption=reply_message, message_id=message_id)
else:
reply_message = 'This telegram account is not linked with any employee profile.\n\nFor assistance, please contact HR.'
frappe.call('mke.custom_api.send_telegram_message', reciepient_chat_ids=[chat_id], caption=reply_message)
except Exception as e:
frappe.log_error(e)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment