Last active
April 21, 2021 12:59
-
-
Save daisy1754/7e8213cd0496711abda44390939c9a99 to your computer and use it in GitHub Desktop.
dataflow_example.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// note this stays outside of updateDatabaseFn struct. | |
// fields in pardo has to be json-serielizable so you cannot have db instance as field | |
var database *gorm.DB | |
type updateDatabaseFn struct { | |
Env string `json:"env"` | |
Project string `json:"project"` | |
} | |
func init() { | |
beam.RegisterDoFn(reflect.TypeOf((*updateDatabaseFn)(nil)).Elem()) | |
} | |
// initialize connection here | |
func (f *updateDatabaseFn) StartBundle(_ctx context.Context) error { | |
var err error | |
database, err = initializeDatabase(context.Background(), f.Env, f.Project) | |
if err != nil { | |
return errors.Wrap(err, "failed to connect to database") | |
} | |
return nil | |
} | |
// use your db connection here | |
func (f *updateDatabaseFn) ProcessElement(ctx context.Context, data SomeData) { | |
// some data processing here | |
err := insertRecords(database, data) | |
// .. more logic here |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment