Skip to content

Instantly share code, notes, and snippets.

@jeann2013
Created June 4, 2019 17:07
Show Gist options
  • Save jeann2013/694277945a057b6f92e8413dfcfef285 to your computer and use it in GitHub Desktop.
Save jeann2013/694277945a057b6f92e8413dfcfef285 to your computer and use it in GitHub Desktop.
from flask import jsonify, make_response, request, url_for, send_file
from firebase_admin import db as firebase_db, firestore
from . import payment
from ..models import *
from app import get_app
import json
import arrow
from datetime import datetime, date, timedelta
from ..logger import get_logger
import stripe
import time
from app import get_app
app = get_app()
logger = get_logger(__name__)
firestore_db = firestore.client()
config = app.config
stripe.api_key = config['STRIPE_SECRET_KEY']
def setApiKeyByHeader():
domain = request.headers.get('domain', None)
key = getKeyByDomain(domain)
stripe.api_key = key
def getKeyByDomain(domain):
if domain in config['OUR_APP_DOMAINS']:
return config['STRIPE_SECRET_KEY']
domain = firebase_db.reference('domains/{}'.format(domain)).get()
if domain is None:
return config['STRIPE_SECRET_KEY']
return domain.get('key_stripe', config['STRIPE_SECRET_KEY'])
def create_plan_test():
stripe.Plan.create(
product={
"name": "Map Labs"
},
nickname="Ultimate",
id="ultimate-licensed",
currency="usd",
usage_type='licensed',
interval="month",
billing_scheme="tiered",
tiers_mode="volume",
tiers=config['PLAN_ULTIMATE_TIERS']
)
stripe.Plan.create(
product={
"name": "Map Labs"
},
nickname="Ultimate Wholesale",
id="ultimate-wholesale",
currency="usd",
usage_type='licensed',
interval="month",
billing_scheme="tiered",
tiers_mode="volume",
tiers=config['PLAN_WHOLESALE_TIERS']
)
return make_response(jsonify({'response': 'success'}), 200)
def create_test_client():
customer = stripe.Customer.create(
description="Customer for ngonzalez3090@gmail.com",
email="ngonzalez3090@gmail.com"
)
time_now = arrow.utcnow().replace(minutes=+2)
next_month = time_now.replace(months=+1, day=1).timestamp
end_trial = time_now.timestamp
subscription = stripe.Subscription.create(
customer=customer.id,
items=[{'plan': 'ultimate-new', 'quantity': 10, }],
trial_end=end_trial,
billing_cycle_anchor=next_month,
)
def update_customer_test(subscription_id):
subscription = stripe.Subscription.retrieve(subscription_id)
invoices = stripe.Invoice.list(subscription=subscription_id)
print(invoices)
stripe.Subscription.modify(subscription_id,
cancel_at_period_end=True,
items=[{
'id': subscription['items']['data'][0].id,
'plan': 'ultimate-new',
'quantity': 3
}]
)
def gid_by_email(email):
user = auth.get_user_by_email(email, app=None)
ref_users = firebase_db.reference('users')
obj_user = ref_users.order_by_child('uid').equal_to(user.uid).get()
for _id in obj_user:
return obj_user[_id].get('gid', None)
return ''
def update_objec_subscription(_object):
metadata = _object.get('metadata', {})
gid = metadata.get('gid', None)
if gid is None:
customer = stripe.Customer.retrieve(_object.get('customer', None))
gid = gid_by_email(customer.email)
group_stats_ref = firebase_db.reference('group_stats/{}'.format(gid))
# to valid if no exists a plan into of customer and proceed to cancel
if _object.get('plan', {}) == {}:
_data = {
'status': 'canceled'
}
group_stats_ref.update(_data)
logger.debug('group stats updated on canceled')
return True
amount = _object.get('plan', {}).get('amount', -1)
status = _object.get('status', 'canceled')
trial_end = _object.get('trial_end', '')
if trial_end != '':
trial_end = arrow.get(trial_end).format('YYYY-MM-DD HH:mm:ss')
current_period_end = _object.get('current_period_end', '')
if current_period_end != '':
current_period_end = arrow.get(current_period_end).format('YYYY-MM-DD HH:mm:ss')
_data = {
'status': status,
'free_plan': amount == 0,
'paid_plan': status == 'active' and amount != 0,
'trial': status == 'trialing',
'active_plan': _object.get('plan', {}).get('nickname', 'No Plan'),
'active_plan_id': _object.get('plan', {}).get('id', 'ultimate'),
'trial_end': trial_end,
'current_period_end': current_period_end
}
group_stats_ref.update(_data)
logger.debug('group stats updated')
return True
@payment.route('/webhook', methods=['POST'])
def receive_webhook():
data = json.loads(request.data)
if data is None:
return make_response(jsonify({'response': 'without data'}), 200)
stripe.api_key = config['STRIPE_SECRET_KEY']
try:
event_type = data.get('type', None)
data = data.get('data', None)
_object = dict(data.get('object', None))
logger.debug('event selected')
logger.debug(event_type)
if event_type == 'customer.subscription.created':
update_objec_subscription(_object)
if event_type == 'customer.subscription.updated':
update_objec_subscription(_object)
if event_type == 'customer.subscription.trial_will_end':
update_objec_subscription(_object)
if event_type == 'customer.subscription.deleted':
update_objec_subscription(_object)
if event_type == 'invoice.payment_succeeded':
subscription = stripe.Subscription.retrieve(id=_object.get('subscription', None))
update_objec_subscription(subscription)
if event_type == 'invoice.payment_failed':
subscription = stripe.Subscription.retrieve(id=_object.get('subscription', None))
update_objec_subscription(subscription)
except Exception as e:
logger.debug('error on webhooks')
logger.debug(e)
return make_response(jsonify({'response': 'success'}), 200)
@payment.route('/subscription/<string:subscription_id>/upgrade/locations', methods=['POST'])
def upgrade_locations(subscription_id):
setApiKeyByHeader()
uid = request.headers.get('uid', None)
gid = request.headers.get('gid', None)
if gid is None or uid is None:
logger.debug('Bad headers - UID: {} - GID: {}'.format(uid, gid))
return make_response(jsonify({'error': 'Bad Headers - Unauthorized'}), 401)
data = json.loads(request.data)
locations = data.get('locations', [])
print(locations)
return make_response(jsonify({'result': 'ok'}), 200)
@payment.route('/setup_gmb_plans', methods=['POST'])
def setup_gmb_plans():
"""Setting up GMB plans for Stripe and Firebase"""
plans = [
{
'nickname': 'Free',
'interval': 'month',
'currency': 'usd',
'amount': 0,
'max_locations': 1
},
{
'nickname': 'Business',
'interval': 'month',
'currency': 'usd',
'amount': 1000,
'max_locations': 10
},
{
'nickname': 'Agency',
'interval': 'month',
'currency': 'usd',
'amount': 8000,
'max_locations': 100
},
]
"""First step - Create product on stripe (GMB Paid Account) """
valid, payload = stripe_create_product_func('GMB Paid Account')
"""If valid (no errors with Stripe), create plans (Free, Business, Agency)"""
if valid is True:
payload = dict(payload)
product_id = None
if 'id' in payload:
product_id = payload['id']
payload.pop('id')
payload['product_id'] = product_id
if product_id is not None:
StripeProduct(**payload).save()
errors = []
for plan in plans:
valid_plan, plan_payload = stripe_create_plan_func(product_id, plan['nickname'], plan['interval'],
plan['currency'], plan['amount'])
if valid_plan:
plan_payload = dict(plan_payload)
if 'id' in plan_payload:
plan_id = plan_payload['id']
plan_payload.pop('id')
plan_payload['plan_id'] = plan_id
plan_payload['max_locations'] = plan['max_locations']
firebase_plan_id = plan['nickname'].lower() + '_plan'
plan_payload['firebase_plan_id'] = firebase_plan_id
StripePlan(**plan_payload).save()
firebase_db.reference('plans').update({'{}'.format(firebase_plan_id): plan_payload})
else:
errors.append({
'plan': plan,
'error': plan_payload
})
if len(errors) > 0:
return make_response(jsonify({'error': errors}), 500)
else:
return make_response(jsonify({'Response': 'Plans added!'}), 200)
else:
return make_response(jsonify({'error': payload}), 500)
@payment.route('/test_charge', methods=['GET'])
def test_stripe_charge():
try:
create_plans()
return make_response(jsonify({'response': 'success'}), 200)
except Exception as e:
raise e
return make_response(jsonify({'error': str(e)}), 500)
@payment.route('/test_create_product', methods=['GET'])
def test_stripe_create_product():
valid, payload = stripe_create_product_func('Test Product')
if valid is True:
payload = dict(payload)
if 'id' in payload:
product_id = payload['id']
payload.pop('id')
payload['product_id'] = product_id
StripeProduct(**payload).save()
return make_response(jsonify({'response': payload}), 200)
else:
return make_response(jsonify({'error': payload}), 500)
@payment.route('/test_create_plan', methods=['GET'])
def test_stripe_create_plan():
return make_response(jsonify({'response': 'Something'}), 200)
@payment.route('/test_create_customer', methods=['GET'])
def test_stripe_create_customer():
return make_response(jsonify({'response': 'Something'}), 200)
@payment.route('/test_create_subscription', methods=['GET'])
def test_stripe_create_subscription():
return make_response(jsonify({'response': 'data'}), 200)
@payment.route('/create_trial_customer', methods=['POST'])
def create_trial_customer():
setApiKeyByHeader()
gid = request.headers.get('gid', None)
uid = request.headers.get('uid', None)
if gid is None or uid is None:
logger.debug('Bad headers - UID: {} - GID: {}'.format(uid, gid))
return make_response(jsonify({'error': 'Bad Headers - Unauthorized'}), 401)
data = request.data
data = json.loads(data)
user = data.get('user', None)
# plan = data.get('plan', None)
_plan = config['PLAN_ULTIMATE_ID']
plan = {
'plan_id': _plan,
'nickname': 'Ultimate Plan',
'firebase_plan_id': 'Ultimate Plan'
}
timezone = data.get('timezone', '')
if user is None is None or plan is None:
logger.debug('Incomplete data')
logger.debug('USER: {}'.format(user))
logger.debug('PLAN: {}'.format(plan))
return make_response(jsonify({'error': 'Incomplete data'}), 400)
valid, payload = stripe_create_customer_func(user['email'], None, gid)
if valid:
"""Create customer subscription for TRIAL"""
payload = dict(payload)
logger.debug('---' * 30 + 'CUSTOMER CREATED SUCCESSFULLY')
logger.debug(payload)
customer_id = payload['id']
subs_valid, subs_payload = stripe_create_subscription_func(
customer_id,
[{'plan': plan['plan_id'], 'quantity': 0}],
gid,
trial=True
)
if subs_valid:
subs_payload = dict(subs_payload)
logger.debug('---' * 30 + 'SUBSCRIPTION CREATED SUCCESSFULLY')
logger.debug(subs_payload)
try:
trial_end = datetime.utcfromtimestamp(subs_payload.get('trial_end', None))
trial_end = trial_end.strftime('%Y-%m-%d %H:%M:%S')
except Exception as e:
trial_end = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
firebase_db.reference('group_stats/{}'.format(gid)).update({
'accounts': 0,
'users': 1,
'active_plan': 'Ultimate Plan',
'active_plan_id': plan['plan_id'],
'locations_count': 0,
'subscription_id': subs_payload['id'],
'customer_id': customer_id,
'trial': True,
'free_plan': False,
'paid_plan': False,
'subscription_canceled': False,
'subscription_paused': False,
'status': subs_payload['status'],
'trial_end': trial_end,
'created_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'updated_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'client_timezone': timezone if timezone is not None else 0
})
firebase_db.reference('users/{}'.format(uid)).set(user)
return make_response(jsonify({'response': subs_payload}), 200)
else:
logger.debug('---' * 30 + 'ERROR CREATING SUBSCRIPTION')
logger.debug(subs_payload)
return make_response(jsonify({'error': 'Error subcription'}), 400)
else:
logger.debug('---' * 30 + 'ERROR CREATING CUSTOMER')
logger.debug(payload)
return make_response(jsonify({'error': payload}), 400)
@payment.route('/plans', methods=['GET'])
def get_plans():
setApiKeyByHeader()
gid = request.headers.get('gid', None)
uid = request.headers.get('uid', None)
if gid is None or uid is None:
logger.debug('Bad headers - UID: {} - GID: {}'.format(uid, gid))
return make_response(jsonify({'error': 'Bad Headers - Unauthorized'}), 401)
plans = stripe.Plan.list()
plans = plans.get('data', [])
return make_response(jsonify({'response': plans}), 200)
def switch_plan(subscription, customer_id, new_plan):
try:
subscription.delete()
except Exception as e:
logger.debug('Error delete subscription')
logger.debug(e)
subscription = stripe.Subscription.create(
customer=customer_id,
items=[
{
"plan": new_plan
},
]
)
return subscription
@payment.route('/plan/clean', methods=['POST'])
def clean_sub():
setApiKeyByHeader()
gid = request.headers.get('gid', None)
uid = request.headers.get('uid', None)
if gid is None or uid is None:
return make_response(jsonify({'error': 'Bad Headers - Unauthorized'}), 401)
data = json.loads(request.data)
data = data.get('data')
customer_id = data.get('customer_id', None)
customer = stripe.Customer.retrieve(customer_id)
# logger.debug(customer)
# subscription_id = data.get('subscription_id', None)
try:
subscription_id = customer.subscriptions.data[0].id
logger.debug('id =>')
logger.debug(subscription_id)
subscription = stripe.Subscription.retrieve(id=subscription_id)
logger.debug('subscription --->')
logger.debug(subscription.status)
status = subscription.status
trial_end = subscription.trial_end
if subscription.discount is not None:
coupon = subscription.discount.coupon.id
obj_coupon = subscription.discount.coupon
else:
coupon = ''
except Exception as e:
trial_end = None
coupon = ''
try:
subscription.delete()
except Exception as e:
logger.debug('Error update subscription')
logger.debug(e)
return make_response(jsonify({'response': 'fail'}), 200)
logger.debug(trial_end)
if status == 'trialing':
time_now = arrow.get(trial_end)
next_month = time_now.replace(months=+1, day=1, hour=0, minute=0, second=0, microsecond=0).timestamp
subscription = stripe.Subscription.create(
customer=customer.id,
items=[{'plan': 'ultimate-licensed', 'quantity': 0, }],
trial_end=trial_end,
billing_cycle_anchor=next_month,
coupon=coupon,
metadata={'gid': gid}
)
else:
time_now = arrow.utcnow()
next_month = time_now.replace(months=+1, day=1, hour=0, minute=0, second=0, microsecond=0).timestamp
subscription = stripe.Subscription.create(
customer=customer.id,
items=[{'plan': 'ultimate-licensed', 'quantity': 0, }],
billing_cycle_anchor=next_month,
coupon=coupon,
metadata={'gid': gid}
)
subscription_id = subscription.id
if coupon != '':
try:
subscription.trial_end = 'now'
subscription.save()
except Exception as e:
logger.debug('error on save trial')
logger.debug(e)
try:
_locations = firebase_db.reference('saved_locations/{}'.format(gid)).get()
accounts = _locations.get('accounts', None)
for _acount in accounts:
locations = accounts[_acount].get('locations', [])
for loc in locations:
name = locations[loc].get('name', None)
firebase_db.reference('group_stats/{}'.format(gid)).child('upgraded_locations').update(
{'{}'.format(name): True})
logger.debug(name)
except Exception as e:
logger.debug('error to get locations')
logger.debug(e)
# for location in locations:
# firebase_db.reference('group_stats/{}'.format(gid)).child('upgraded_locations').update({'{}'.format(location.get('name',None)): True})
subscription = stripe.Subscription.retrieve(id=subscription_id)
new_plan = 'ultimate-licensed'
nickname = 'Ultimate Plan'
if trial_end is not None:
trial_end = datetime.utcfromtimestamp(trial_end).strftime('%Y-%m-%d %H:%M:%S')
else:
trial_end = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
logger.debug("trial end")
logger.debug(subscription.trial_end)
logger.debug(trial_end)
_data = {
'status': subscription.status,
'free_plan': False,
'paid_plan': subscription.status == 'active',
'trial': subscription.status == 'trialing',
'active_plan': nickname,
'active_plan_id': new_plan,
'subscription_id': subscription.id,
'trial_end': trial_end
}
if coupon != '':
_data['coupon'] = obj_coupon
group_stats_ref = firebase_db.reference('group_stats/{}'.format(gid))
group_stats_ref.update(_data)
return make_response(jsonify({'response': 'ok'}), 200)
@payment.route('/invoice/pay', methods=['POST'])
def pay_invoice():
setApiKeyByHeader()
gid = request.headers.get('gid', None)
uid = request.headers.get('uid', None)
if gid is None or uid is None:
return make_response(jsonify({'error': 'Bad Headers - Unauthorized'}), 401)
data = json.loads(request.data)
data = data.get('data', None)
id_invoice = data.get('id_invoice', None)
subscription_id = data.get('subscription_id', None)
active_plan = data.get('active_plan', None)
try:
invoice = stripe.Invoice.retrieve(id_invoice)
invoice.pay()
is_paid = True
except Exception as e:
logger.debug(e)
is_paid = False
subscription = stripe.Subscription.retrieve(id=subscription_id)
_data = {
'status': subscription.get('status', None),
'free_plan': active_plan == 'free_forever',
'paid_plan': active_plan == 'ultimate' and subscription.get('status', None) == 'active',
'trial': False
}
group_stats_ref = firebase_db.reference('group_stats/{}'.format(gid))
group_stats_ref.update(_data)
if is_paid:
return make_response(jsonify({'response': 'Invoice Paid'}),
200)
else:
return make_response(jsonify({'response': 'The transaction failed. Please try again or check with your bank.'}),
200)
@payment.route('/plan/upgrade', methods=['POST'])
def upgrade_client_plan():
setApiKeyByHeader()
gid = request.headers.get('gid', None)
uid = request.headers.get('uid', None)
if gid is None or uid is None:
return make_response(jsonify({'error': 'Bad Headers - Unauthorized'}), 401)
data = json.loads(request.data)
card_element = data.get('card', None)
locations = data.get('locations', None)
logger.debug('locations')
logger.debug(locations)
group_stats_ref = firebase_db.reference('group_stats/{}'.format(gid))
group_stats = group_stats_ref.get()
locations_count = group_stats.get('locations_count', 1)
subscription_id = group_stats.get('subscription_id', None)
customer_id = group_stats.get('customer_id', None)
logger.debug('CARD')
logger.debug(card_element)
customer = stripe.Customer.retrieve(customer_id)
# it's a different card
if card_element is not None:
try:
customer.source = card_element['id']
customer.save()
card_element = customer.sources.data[0]
group_stats_ref.update({'payment_method': card_element})
except Exception as e:
logger.debug('Error on Card')
logger.debug(e)
return make_response(
jsonify({'response': 'The transaction failed. Please try again or check with your bank.'}), 200)
subscription = stripe.Subscription.retrieve(id=subscription_id)
# check if subscription it's correct
try:
subscription.delete()
except Exception as e:
logger.debug('Error cancel subscription')
logger.debug(e)
count_locations = len(locations) if len(locations) > 0 else 1
next_month = arrow.utcnow().replace(months=+1, day=1, hour=0, minute=0, second=0, microsecond=0).timestamp
try:
subscription = stripe.Subscription.create(
customer=customer_id,
cancel_at_period_end=False,
items=[{
'plan': subscription['items']['data'][0].plan.id,
'quantity': len(locations),
}],
billing_cycle_anchor=next_month,
metadata={'gid': gid}
)
subscription_id = subscription.id
except Exception as e:
logger.debug('Error response')
logger.debug(e)
return make_response(jsonify({'error': str(e)}), 400)
logger.debug('Try to pay and subscription data')
_subscription = stripe.Subscription.retrieve(id=subscription_id)
# get accounts to upgrade
_data = {
'status': _subscription.get('status', None),
'paid_plan': _subscription.get('status', None) == 'active',
'trial': False,
'active_plan': _subscription['items']['data'][0].plan.nickname,
'active_plan_id': _subscription['items']['data'][0].plan.id,
'subscription_id': _subscription.id,
}
group_stats_ref.update(_data)
logger.debug('Status customer')
logger.debug(_subscription.get('status', None))
if _subscription.get('status', None) == 'active':
for location in locations:
firebase_db.reference('group_stats/{}'.format(gid)).child('upgraded_locations').update(
{'{}'.format(location.get('name', None)): True})
return make_response(jsonify({'response': 'Plan upgraded!'}), 200)
else:
return make_response(jsonify({'response': 'The transaction failed. Please try again or check with your bank.'}),
200)
@payment.route('/customer/payment_method/update', methods=['POST'])
def customer_cc_update(customer_id_arg=None, new_card_arg=None, self_return=False):
setApiKeyByHeader()
uid = request.headers.get('uid', None)
gid = request.headers.get('gid', None)
if gid is None or uid is None:
logger.debug('Bad headers - UID: {} - GID: {}'.format(uid, gid))
return make_response(jsonify({'error': 'Bad Headers - Unauthorized'}), 401)
data = json.loads(request.data)
customer_id = data.get('customerId', None)
new_card = data.get('newCard', None)
if customer_id_arg is not None:
customer_id = customer_id_arg
if new_card_arg is not None:
new_card = new_card_arg
if customer_id is None or new_card is None:
logger.debug('Bad headers - UID: {} - GID: {}'.format(uid, gid))
return make_response(jsonify({'error': 'Incomplete data'}), 400)
logger.debug('TRYING TO RETRIEVE CUSTOMER TO ADD PAYMENT METHOD')
customer_valid, customer_payload = stripe_get_customer(customer_id)
if customer_valid:
try:
logger.debug('CUSTOMER RETRIEVED')
customer_payload.source = new_card['id']
customer_payload.save()
new_card = customer_payload.sources.data[0]
firebase_db.reference('group_stats/{}'.format(gid)).update({'payment_method': new_card})
if self_return:
return 200, new_card
else:
return make_response(jsonify({'response': new_card}), 200)
except stripe.error.StripeError as stripe_err:
logger.debug('ERROR UPDATING PAYMENT METHOD ON USER')
if isinstance(stripe_err, dict):
body = stripe_err.json_body
stripe_err = body.get('error', {})
logger.debug(str(stripe_err))
if self_return:
return 400, str(stripe_err)
else:
return make_response(jsonify({'error': str(stripe_err)}), 400)
else:
logger.debug('ERROR RETRIEVING CUSTOMER')
logger.debug(str(customer_payload))
if self_return:
return 500, str(customer_payload)
else:
return make_response(jsonify({'error': str(customer_payload)}), 500)
@payment.route('/<string:customer_id>/customer', methods=['GET'])
def get_customer(customer_id):
setApiKeyByHeader()
uid = request.headers.get('uid', None)
gid = request.headers.get('gid', None)
try:
customer = stripe.Customer.retrieve(customer_id)
return make_response(jsonify({'customer': customer}), 200)
except Exception as e:
return make_response(jsonify({'error': 'no have data'}), 400)
@payment.route('/<string:subscription_id>/invoices', methods=['GET'])
def get_customer_invoices(subscription_id):
setApiKeyByHeader()
uid = request.headers.get('uid', None)
gid = request.headers.get('gid', None)
if gid is None or uid is None:
logger.debug('Bad headers - UID: {} - GID: {}'.format(uid, gid))
return make_response(jsonify({'error': 'Bad Headers - Unauthorized'}), 401)
status, payload = stripe_get_subscription_invoices(subscription_id)
if status:
return make_response(jsonify({'invoices': payload}), 200)
else:
_error = str(payload)
return make_response(jsonify({'error': _error}), 400)
@payment.route('/subscription/<string:subscription_id>/usage', methods=['POST'])
def update_subscription_usage(subscription_id):
setApiKeyByHeader()
uid = request.headers.get('uid', None)
gid = request.headers.get('gid', None)
if gid is None or uid is None:
logger.debug('Bad headers - UID: {} - GID: {}'.format(uid, gid))
return make_response(jsonify({'error': 'Bad Headers - Unauthorized'}), 401)
data = json.loads(request.data)
usage = data.get('usage', None)
logger.debug('data usage =>')
logger.debug(data)
logger.debug(usage)
try:
subscription = stripe.Subscription.retrieve(subscription_id)
payload = stripe.Subscription.modify(subscription_id,
cancel_at_period_end=False,
items=[{
'id': subscription['items']['data'][0].id,
'plan': subscription['items']['data'][0].plan.id,
'quantity': usage
}]
)
return make_response(jsonify({'response': payload}), 200)
except Exception as e:
logger.debug('Error response')
logger.debug(e)
return make_response(jsonify({'error': str(e)}), 400)
@payment.route('/test/update_subscription/<string:subscription_id>', methods=['GET'])
def update_subscription_quantity(subscription_id):
setApiKeyByHeader()
if not subscription_id:
return make_response(jsonify({'error': 'no subscription id provided'}), 400)
try:
subscription = stripe.Subscription.retrieve(id=subscription_id)
item = subscription['items']['data'][0]
timestamp = time.mktime(datetime.today().timetuple())
usage = stripe.UsageRecord.create(
quantity=2,
timestamp=int(timestamp),
subscription_item=item.id
)
return make_response(jsonify({'usage': usage}), 200)
except stripe.error.StripeError as stripe_err:
err = stripe_err
if isinstance(stripe_err, dict):
body = stripe_err.json_body
err = body.get('error', {})
return make_response(jsonify({'error': str(err)}), 400)
except Exception as err:
return make_response(jsonify({'error': str(err)}), 400)
def stripe_continue_subscription(subscription_id):
try:
subscription = stripe.Subscription.retrieve(subscription_id)
subscription.coupon = None
subscription.save()
return True, subscription
except stripe.error.StripeError as stripe_err:
err = stripe_err
if isinstance(stripe_err, dict):
body = stripe_err.json_body
err = body.get('error', {})
return False, err
except Exception as err:
return False, err
def stripe_cancel_subscription(subscription_id, pause_subscription=False):
try:
subscription = stripe.Subscription.retrieve(subscription_id)
if pause_subscription:
subscription.coupon = 'free-period'
subscription.save()
else:
subscription.delete()
return True, subscription
except stripe.error.StripeError as stripe_err:
err = stripe_err
if isinstance(stripe_err, dict):
body = stripe_err.json_body
err = body.get('error', {})
return False, err
except Exception as err:
return False, err
def stripe_charge_amount(customer_id):
"""Creating a payment charge"""
try:
invoice = stripe.Invoice.create(
customer=customer_id,
)
invoice.pay()
return True, invoice
except stripe.error.StripeError as stripe_err:
err = stripe_err
if isinstance(stripe_err, dict):
body = stripe_err.json_body
err = body.get('error', {})
return False, err
except Exception as err:
return False, err
def stripe_end_trial(subscription_id):
try:
subscription = stripe.Subscription.retrieve(subscription_id)
subscription.trial_end = 'now'
subscription.save()
return True, subscription
except stripe.error.StripeError as stripe_err:
err = stripe_err
if isinstance(stripe_err, dict):
body = stripe_err.json_body
err = body.get('error', {})
return False, err
except Exception as err:
return False, err
def stripe_get_subscription_invoices(subscription_id):
try:
invoices = stripe.Invoice.list(subscription=subscription_id)
return True, invoices['data']
except stripe.error.StripeError as stripe_err:
err = stripe_err
if isinstance(stripe_err, dict):
body = stripe_err.json_body
err = body.get('error', {})
return False, err
except Exception as err:
return False, err
def stripe_set_usage(subscription_id, quantity, increment=False, invoice_now=False):
try:
subscription = stripe.Subscription.retrieve(id=subscription_id)
item = subscription['items']['data'][0]
timestamp = time.mktime(datetime.today().timetuple())
action = 'set' if increment is False else 'increment'
logger.debug('ACTION TYPE FOR USAGE ON STRIPE')
logger.debug(action)
usage = stripe.UsageRecord.create(
quantity=quantity,
timestamp=int(timestamp),
subscription_item=item.id,
action=action
)
if invoice_now:
subscription.billing_cycle_anchor = 'now'
subscription.save()
return True, usage
except stripe.error.StripeError as stripe_err:
err = stripe_err
if isinstance(stripe_err, dict):
body = stripe_err.json_body
err = body.get('error', {})
return False, err
except Exception as err:
return False, err
def stripe_get_customer(customer_id):
try:
customer = stripe.Customer.retrieve(customer_id)
return True, customer
except stripe.error.StripeError as stripe_err:
return False, stripe_err
except Exception as err:
return False, err
def stripe_create_product_func(name, product_type='service'):
try:
product = stripe.Product.create(
name=name,
type=product_type
)
return True, product
except StandardError as e:
print('ERROR CREATING PRODUCT')
print(str(e))
return False, e
def stripe_create_plan_func(product_id, nickname, interval, currency, amount):
try:
plan = stripe.Plan.create(
product=product_id,
nickname=nickname,
interval=interval,
currency=currency,
amount=amount
)
return True, plan
except StandardError as e:
print('ERROR CREATING PLAN')
print(str(e))
return False, e
def stripe_create_customer_func(email, source=None, gid=None):
try:
data = {'email': email}
if source is not None:
data['source'] = source
if gid is not None:
data['metadata'] = {'gid': gid}
customer = stripe.Customer.create(**data)
return True, customer
except stripe.error.StripeError as e:
body = e.json_body
err = body.get('error', {})
return False, err
except Exception as err:
return False, err
def stripe_create_subscription_func(customer_id, items, gid, trial=False):
timestamp = int(time.mktime(datetime.today().timetuple()))
# set next 1st day of the month to billing
time_now = arrow.utcnow().replace(days=+30)
next_month = time_now.replace(months=+1, day=1).timestamp
try:
data = {'customer': customer_id, 'items': items, 'metadata': {'gid': gid}}
if trial:
data['trial_period_days'] = 30
data['billing_cycle_anchor'] = next_month
subscription = stripe.Subscription.create(**data)
return True, subscription
except stripe.error.StripeError as e:
body = e.json_body
err = body.get('error', {})
return False, err
except Exception as err:
return False, err
def stripe_modify_subscription(subscription_id, plan_id):
setApiKeyByHeader()
try:
stripe_subscription = stripe.Subscription.retrieve(subscription_id)
items = [{
'id': stripe_subscription['items']['data'][0].id,
'plan': plan_id,
}]
updated = stripe.Subscription.modify(subscription_id, cancel_at_period_end=False, items=items)
return True, 'Subscription Updated'
except stripe.error.StripeError as e:
err = e
if isinstance(e, dict):
body = e.json_body
err = body.get('error', {})
return False, err
except Exception as err:
return False, err
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment