-
-
Save igolaizola/4887f50d24f0075ed46957f2b6157f37 to your computer and use it in GitHub Desktop.
Code snippets from "How I built super-optimized Apify Actors in Go"
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package apify | |
| func (c *Client) SaveItems(ctx context.Context, v any) error { | |
| u := fmt.Sprintf("v2/datasets/%s/items", c.dataset) | |
| if _, err := c.do(ctx, "POST", u, v, nil); err != nil { | |
| return fmt.Errorf("apify: couldn't put items: %w", err) | |
| } | |
| return nil | |
| } | |
| func (c *Client) Dataset() string { | |
| return c.dataset | |
| } | |
| func (c *Client) GetInput(ctx context.Context, v any) error { | |
| u := fmt.Sprintf("v2/key-value-stores/%s/records/INPUT", c.key) | |
| if _, err := c.do(ctx, "GET", u, nil, v); err != nil { | |
| return fmt.Errorf("apify: couldn't get input: %w", err) | |
| } | |
| return nil | |
| } | |
| func (c *Client) PutKeyValue(ctx context.Context, key, mime string, b []byte) error { | |
| u := fmt.Sprintf("v2/key-value-stores/%s/records/%s", c.key, key) | |
| bl := &blob{ | |
| contentType: mime, | |
| reader: bytes.NewReader(b), | |
| } | |
| if _, err := c.do(ctx, "PUT", u, bl, nil); err != nil { | |
| return fmt.Errorf("apify: couldn't put key-value: %w", err) | |
| } | |
| return nil | |
| } | |
| func (c *Client) KeyValueURL(key string) string { | |
| return fmt.Sprintf("https://api.apify.com/v2/key-value-stores/%s/records/%s", c.key, key) | |
| } | |
| type actorResponse struct { | |
| Data Actor `json:"data"` | |
| } | |
| type Actor struct { | |
| ID string `json:"id"` | |
| Name string `json:"name"` | |
| PricingInfos []struct { | |
| PricingModel string `json:"pricingModel"` | |
| PricingPerEvent struct { | |
| ActorChargeEvents map[string]struct { | |
| EventPriceUSD float64 `json:"eventPriceUsd"` | |
| EventTitle string `json:"eventTitle"` | |
| EventDescription string `json:"eventDescription"` | |
| } `json:"actorChargeEvents"` | |
| } `json:"pricingPerEvent"` | |
| } | |
| } | |
| // GetActor retrieves the actor information based on the actorID. | |
| // See https://docs.apify.com/api/v2/act-get | |
| func (c *Client) GetActor() (*Actor, error) { | |
| u := fmt.Sprintf("v2/acts/%s", c.actorID) | |
| var resp actorResponse | |
| if _, err := c.do(context.Background(), "GET", u, nil, &resp); err != nil { | |
| return nil, fmt.Errorf("apify: couldn't get actor: %w", err) | |
| } | |
| return &resp.Data, nil | |
| } | |
| type chargeRequest struct { | |
| EventName string `json:"eventName"` | |
| Count int `json:"count"` | |
| } | |
| // AddCharge adds a charge for the given event and count. | |
| func (c *Client) AddCharge(event string, count int) error { | |
| // Default dataset item events are charged automatically by Apify | |
| if event != DatasetItemEvent { | |
| u := fmt.Sprintf("v2/actor-runs/%s/charge", c.runID) | |
| req := &chargeRequest{ | |
| EventName: event, | |
| Count: count, | |
| } | |
| if _, err := c.do(context.Background(), "POST", u, req, nil); err != nil { | |
| return fmt.Errorf("apify: couldn't add charge: %w", err) | |
| } | |
| } | |
| // Update the charged amount | |
| if price, ok := c.prices[event]; ok { | |
| c.charged += price * float64(count) | |
| } | |
| return nil | |
| } | |
| // Check if the max charge has been reached. | |
| func (c *Client) MaxChargeReached() bool { | |
| return c.isPPE && c.maxCharge > 0 && c.charged >= c.maxCharge | |
| } | |
| // MaxEvents returns the maximum number of events of the given type that can be | |
| // processed without exceeding the max charge. If the client is not in PPE mode | |
| // or the event type is not charged, it returns (0, false). | |
| func (c *Client) MaxEvents(ev string) (int, bool) { | |
| if !c.isPPE { | |
| return 0, false | |
| } | |
| price, ok := c.prices[ev] | |
| if !ok || price == 0 { | |
| return 0, false | |
| } | |
| // Max events is the max charge divided by the price per event. | |
| return int(c.maxCharge / price), true | |
| } | |
| // MaxResults is a convenience wrapper around MaxEvents for the "result" event. | |
| func (c *Client) MaxResults() (int, bool) { | |
| return c.MaxEvents(c.resultEvent) | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package apify | |
| const ( | |
| ResultEvent = "result" | |
| StartEvent = "apify-actor-start" | |
| DatasetItemEvent = "apify-default-dataset-item" | |
| ) | |
| type Client struct { | |
| debug bool | |
| client *http.Client | |
| token string | |
| key string | |
| dataset string | |
| actorID string | |
| runID string | |
| // PPE related fields | |
| isPPE bool | |
| resultEvent string | |
| prices map[string]float64 | |
| maxCharge float64 | |
| charged float64 | |
| } | |
| func NewActor(debug bool) (*Client, error) { | |
| token := os.Getenv("APIFY_TOKEN") | |
| if token == "" { | |
| return nil, errors.New("missing APIFY_TOKEN") | |
| } | |
| key := os.Getenv("APIFY_DEFAULT_KEY_VALUE_STORE_ID") | |
| if key == "" { | |
| return nil, errors.New("missing APIFY_DEFAULT_KEY_VALUE_STORE_ID") | |
| } | |
| dataset := os.Getenv("APIFY_DEFAULT_DATASET_ID") | |
| if dataset == "" { | |
| return nil, errors.New("missing APIFY_DEFAULT_DATASET_ID") | |
| } | |
| actorID := os.Getenv("APIFY_ACTOR_ID") | |
| if actorID == "" { | |
| return nil, errors.New("missing APIFY_ACTOR_ID") | |
| } | |
| runID := os.Getenv("APIFY_ACTOR_RUN_ID") | |
| if runID == "" { | |
| return nil, errors.New("missing APIFY_ACTOR_RUN_ID") | |
| } | |
| client := &http.Client{ | |
| Timeout: time.Second * 30, | |
| } | |
| c := &Client{ | |
| debug: debug, | |
| client: client, | |
| token: token, | |
| key: key, | |
| dataset: dataset, | |
| actorID: actorID, | |
| runID: runID, | |
| isPPE: false, | |
| resultEvent: "", | |
| prices: make(map[string]float64), | |
| charged: 0, | |
| } | |
| // See https://docs.apify.com/platform/actors/publishing/monetize/pay-per-event | |
| if os.Getenv("APIFY_PPE") == "1" { | |
| slog.Info("This actor is running in pay-per-event (PPE) mode") | |
| c.isPPE = true | |
| // Get max charge if set | |
| if v := os.Getenv("ACTOR_MAX_TOTAL_CHARGE_USD"); v != "" { | |
| maxCharge, err := strconv.ParseFloat(v, 64) | |
| if err != nil { | |
| return nil, fmt.Errorf("apify: couldn't parse ACTOR_MAX_TOTAL_CHARGE_USD (%s): %w", v, err) | |
| } | |
| c.maxCharge = maxCharge | |
| } | |
| // Get actor pricing info | |
| actor, err := c.GetActor() | |
| if err != nil { | |
| return nil, fmt.Errorf("apify: couldn't get actor info: %w", err) | |
| } | |
| c.resultEvent = DatasetItemEvent | |
| for _, info := range actor.PricingInfos { | |
| for name, evt := range info.PricingPerEvent.ActorChargeEvents { | |
| c.prices[name] = evt.EventPriceUSD | |
| switch name { | |
| case "apify-actor-start": | |
| // Actor start is charged automatically by Apify | |
| c.charged += evt.EventPriceUSD | |
| case ResultEvent: | |
| // Results are charged manually by us | |
| c.resultEvent = name | |
| } | |
| } | |
| } | |
| } | |
| return c, nil | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package apify | |
| type Saver[T any] struct { | |
| total int | |
| file *jsonFile | |
| } | |
| func (s *Saver[T]) Total() int { | |
| return s.total | |
| } | |
| func (s *Saver[T]) Save(ap *Client, output string, current []T) error { | |
| ctx, cancel := context.WithTimeout(context.Background(), time.Minute) | |
| defer cancel() | |
| s.total += len(current) | |
| slog.Info("📦 Saved results", "current", len(current), "total", s.total) | |
| if len(current) == 0 { | |
| return nil | |
| } | |
| if ap != nil { | |
| if ap.MaxChargeReached() { | |
| return errors.New("apify: max charge reached") | |
| } | |
| // Just send the typed slice; the client will JSON-encode it. | |
| if err := ap.SaveItems(ctx, current); err != nil { | |
| return err | |
| } | |
| // Charge for results if the client is in PPE mode | |
| if ap.isPPE { | |
| if err := ap.AddCharge(ap.resultEvent, len(current)); err != nil { | |
| return err | |
| } | |
| } | |
| return nil | |
| } | |
| chunks := make([][]byte, len(current)) | |
| for i, item := range current { | |
| b, err := json.MarshalIndent(item, "", " ") | |
| if err != nil { | |
| return err | |
| } | |
| chunks[i] = indentJSON(b, " ") | |
| } | |
| if s.file == nil || s.file.path != output { | |
| s.file = &jsonFile{path: output} | |
| } | |
| return s.file.append(chunks) | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment