Skip to content

Instantly share code, notes, and snippets.

@simonl2002
Last active December 13, 2022 21:44
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Embed
What would you like to do?
Our Salesforce App Code
package main
import (
"log"
"os"
"strconv"
"github.com/meroxa/turbine-go"
"github.com/meroxa/turbine-go/runner"
)
func main() {
runner.Start(App{})
}
var _ turbine.App = (*App)(nil)
type App struct{}
func (a App) Run(v turbine.Turbine) error {
platformDB, err := v.Resources("dwh")
if err != nil {
return err
}
configs := turbine.ResourceConfigs{
turbine.ResourceConfig{
Field: "table.types",
Value: "VIEW",
},
turbine.ResourceConfig{
Field: "incrementing.column.name",
Value: "account_id",
},
turbine.ResourceConfig{
Field: "validate.non.null",
Value: "false",
},
}
rr, err := platformDB.Records("tablename", configs)
if err != nil {
return err
}
// Stripe API Key
err = v.RegisterSecret("STRIPE_API_KEY")
if err != nil {
return err
}
//SFDC config
err = v.RegisterSecret("SALESFORCE_INSTANCE_URL")
if err != nil {
return err
}
err = v.RegisterSecret("SALESFORCE_CLIENT_ID")
if err != nil {
return err
}
err = v.RegisterSecret("SALESFORCE_CLIENT_SECRET")
if err != nil {
return err
}
err = v.RegisterSecret("SALESFORCE_USER")
if err != nil {
return err
}
err = v.RegisterSecret("SALESFORCE_PASSWORD")
if err != nil {
return err
}
err = v.RegisterSecret("SALESFORCE_TOKEN")
if err != nil {
return err
}
// Not writing the results anywhere
v.Process(rr, WriteToSalesforce{})
return nil
}
func RecordToProductData(r turbine.Record) ProductData {
//gjson return all json numeric types as float64
accountId := r.Payload.Get("account_id").(float64)
createdAt := r.Payload.Get("account_created_at").(float64)
givenName, ok := r.Payload.Get("user_given_name").(string)
if !ok {
givenName = ""
}
familyName, ok := r.Payload.Get("user_family_name").(string)
if !ok {
familyName = ""
}
planName, ok := r.Payload.Get("plan_name").(string)
if !ok {
planName = ""
}
return ProductData{
accountId: strconv.Itoa(int(accountId)),
email: r.Payload.Get("user_email").(string),
givenName: givenName,
familyName: familyName,
planName: planName,
stripeSubscriptionId: r.Payload.Get("stripe_subscription_id").(string),
accountCreatedAt: strconv.Itoa(int(createdAt)),
}
}
type WriteToSalesforce struct{}
func (f WriteToSalesforce) Process(rr []turbine.Record) []turbine.Record {
//setup subscription fetching
stripeApiKey := os.Getenv("STRIPE_API_KEY")
subscriptionFetcher := newBasicStripeFetcher(stripeApiKey)
//setup sfdc updater
salesforceInstanceUrl := os.Getenv("SALESFORCE_INSTANCE_URL")
salesforceClientId := os.Getenv("SALESFORCE_CLIENT_ID")
salesforceUser := os.Getenv("SALESFORCE_USER")
salesforcePassword := os.Getenv("SALESFORCE_PASSWORD")
salesforceToken := os.Getenv("SALESFORCE_TOKEN")
salesforceUpdater, err := newBasicSalesforceUpdater(salesforceInstanceUrl, salesforceClientId, salesforceUser, salesforcePassword, salesforceToken)
if err != nil {
log.Fatal("ERROR: salesforce updater creation failed")
}
for _, r := range rr {
pd := RecordToProductData(r)
subscriptionId := r.Payload.Get("stripe_subscription_id").(string)
subscriptionStatus, err := subscriptionFetcher.fetchSubscriptionStatus(subscriptionId)
if err != nil {
continue
}
log.Println("Subscription Status is: " + subscriptionStatus)
pd.subscriptionStatus = subscriptionStatus
//we need to fetch the sfdc information by using account id
err = salesforceUpdater.updateProductInstance(pd)
}
// return original records unmodified
return rr
}
package main
import (
"errors"
"fmt"
"log"
"github.com/simpleforce/simpleforce"
)
type ProductData struct {
accountId string
email string
givenName string
familyName string
planName string
stripeSubscriptionId string
subscriptionStatus string
accountCreatedAt string
}
type SalesforceUpdater interface {
updateProductInstance(data ProductData) error
}
type BasicSalesforceUpdater struct {
client *simpleforce.Client
}
func newBasicSalesforceUpdater(instanceUrl string, clientId string, user string, password string, token string) (BasicSalesforceUpdater, error) {
client := simpleforce.NewClient(instanceUrl, clientId, simpleforce.DefaultAPIVersion)
if client == nil {
return BasicSalesforceUpdater{}, errors.New("Unable to create Salesforce client")
}
err := client.LoginPassword(user, password, token)
if err != nil {
return BasicSalesforceUpdater{}, errors.New("Unable to login to Salesforce")
}
return BasicSalesforceUpdater{client: client}, nil
}
func (bsu BasicSalesforceUpdater) query(data ProductData) error {
q := fmt.Sprintf("SELECT FIELDS(ALL) FROM Product_Instance__c WHERE Workspace_Id__c = '%s' LIMIT 1", data.accountId)
result, err := bsu.client.Query(q)
if err != nil {
return err
}
if len(result.Records) != 1 {
return errors.New("unexpected query result")
}
obj := result.Records[0]
if obj == nil {
return errors.New("No Product Instance Found")
}
log.Println("Found product Instance")
firstName := obj.StringField("Admin_First_Name__c")
if firstName == "" {
return errors.New("Couldn't fetch first name")
}
log.Println("Found product instance with admin name: " + firstName)
return nil
}
func (bsu BasicSalesforceUpdater) updateProductInstance(data ProductData) error {
log.Println("account Id is: " + data.accountId)
obj := bsu.client.SObject("Product_Instance__c").
Set("ExternalIDField", "Workspace_Id__c").
Set("Workspace_Id__c", data.accountId).
Set("Name", "Org: "+data.accountId).
Set("Admin_Email__c", data.email).
Set("Admin_First_Name__c", data.givenName).
Set("Admin_Last_Name__c", data.familyName).
Set("Product__c", data.planName).
Set("Stripe_Subscription_Id__c", data.stripeSubscriptionId).
Set("Subscription_Status__c", data.subscriptionStatus).
Set("Workspace_Created_At__c", data.accountCreatedAt).
Upsert()
if obj == nil {
return errors.New("upsert failed!")
}
return nil
}
package main
import (
"errors"
"fmt"
"log"
"github.com/simpleforce/simpleforce"
)
type ProductData struct {
accountId string
email string
givenName string
familyName string
planName string
stripeSubscriptionId string
subscriptionStatus string
accountCreatedAt string
}
type SalesforceUpdater interface {
updateProductInstance(data ProductData) error
}
type BasicSalesforceUpdater struct {
client *simpleforce.Client
}
func newBasicSalesforceUpdater(instanceUrl string, clientId string, user string, password string, token string) (BasicSalesforceUpdater, error) {
client := simpleforce.NewClient(instanceUrl, clientId, simpleforce.DefaultAPIVersion)
if client == nil {
return BasicSalesforceUpdater{}, errors.New("Unable to create Salesforce client")
}
err := client.LoginPassword(user, password, token)
if err != nil {
return BasicSalesforceUpdater{}, errors.New("Unable to login to Salesforce")
}
return BasicSalesforceUpdater{client: client}, nil
}
func (bsu BasicSalesforceUpdater) query(data ProductData) error {
q := fmt.Sprintf("SELECT FIELDS(ALL) FROM Product_Instance__c WHERE Workspace_Id__c = '%s' LIMIT 1", data.accountId)
result, err := bsu.client.Query(q)
if err != nil {
return err
}
if len(result.Records) != 1 {
return errors.New("unexpected query result")
}
obj := result.Records[0]
if obj == nil {
return errors.New("No Product Instance Found")
}
log.Println("Found product Instance")
firstName := obj.StringField("Admin_First_Name__c")
if firstName == "" {
return errors.New("Couldn't fetch first name")
}
log.Println("Found product instance with admin name: " + firstName)
return nil
}
func (bsu BasicSalesforceUpdater) updateProductInstance(data ProductData) error {
log.Println("account Id is: " + data.accountId)
obj := bsu.client.SObject("Product_Instance__c").
Set("ExternalIDField", "Workspace_Id__c").
Set("Workspace_Id__c", data.accountId).
Set("Name", "Org: "+data.accountId).
Set("Admin_Email__c", data.email).
Set("Admin_First_Name__c", data.givenName).
Set("Admin_Last_Name__c", data.familyName).
Set("Product__c", data.planName).
Set("Stripe_Subscription_Id__c", data.stripeSubscriptionId).
Set("Subscription_Status__c", data.subscriptionStatus).
Set("Workspace_Created_At__c", data.accountCreatedAt).
Upsert()
if obj == nil {
return errors.New("upsert failed!")
}
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment