Skip to content

Instantly share code, notes, and snippets.

@daisy1754
Last active April 21, 2021 12:59
Show Gist options
  • Save daisy1754/7e8213cd0496711abda44390939c9a99 to your computer and use it in GitHub Desktop.
Save daisy1754/7e8213cd0496711abda44390939c9a99 to your computer and use it in GitHub Desktop.
dataflow_example.go
// note this stays outside of updateDatabaseFn struct. 
// fields in pardo has to be json-serielizable so you cannot have db instance as field
var database *gorm.DB
type updateDatabaseFn struct {
 Env string `json:"env"`
 Project string `json:"project"`
}
func init() {
beam.RegisterDoFn(reflect.TypeOf((*updateDatabaseFn)(nil)).Elem())
}
// initialize connection here
func (f *updateDatabaseFn) StartBundle(_ctx context.Context) error {
 var err error
 database, err = initializeDatabase(context.Background(), f.Env, f.Project)
 if err != nil {
 return errors.Wrap(err, "failed to connect to database")
 }
 return nil
}
// use your db connection here
func (f *updateDatabaseFn) ProcessElement(ctx context.Context, data SomeData) {
 // some data processing here
 err := insertRecords(database, data)
// .. more logic here
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment