Skip to content

Instantly share code, notes, and snippets.

@judell
Created February 2, 2024 17:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save judell/428ff0d064be960f33463ab9dacf9f5d to your computer and use it in GitHub Desktop.
Save judell/428ff0d064be960f33463ab9dacf9f5d to your computer and use it in GitHub Desktop.
workflow-test-setup

Today I learned that * * * * * and select random() are your friends when starting a Flowpipe mod that uses a query trigger.

* * * * * to iterate as fast as possible: every minute

select random() to ensure there is always fresh data, otherwise nothing will happen

If you're trying to debug something downstream, like sending email, this ensures you'll actually test the downstream thing every time.

Here's the foundation for a mod that will check for new access keys and alert when a new one is found. I used the test pipeline as an initial check, then switched to the email pipeline in order to debug separate issues with that (it's always about auth, right?) once I knew the pipeline would receive a message every minute.

When it's running I can see the message in three places: the console (using output "test"), the email, and also in flowpipe.db, which lives at the mod's root.

trigger_name|primary_key|row_hash|created_at|updated_at
local_trigger_query_aws_iam_access_key|0.9426633829030742|3ea6211b6d8c2ceb097e1ced6f788e8d2e15771dfa7f78a33c48154590ce4c19|2024-02-02T03:12:00.004Z|```

Now that all the moving parts are aligned, I can switch to the real query, like this, that will do nothing until there's a newly-created key.

```select
  *
from
  aws_iam_access_key
where
  create_date > now() - interval '1 day'  ```

I always have to remind myself to start with the simplest things and expand incrementally. That's especially important when building/debugging/testing workflows which are notoriously fiddly creatures.

Here's today's little test harness, in case helpful to anyone.

```trigger "query" "aws_iam_access_key" {
  connection_string = "postgres://steampipe@localhost:9193/steampipe"
  schedule = "* * * * *"
  primary_key = "message"
  sql = <<EOQ
    select random()::text as message
  EOQ

  capture "insert" {
    pipeline = pipeline.email

    args = {
      message = self.inserted_rows[0].message
    }
  }

}

pipeline "test" {

  param "message" {
    type = string
  }

  output "test" {
    value = param.message
  }
}

pipeline "email" {

  param "message" {
    type = string
  }

  output "test" {
    value = param.message
  }

  step "email" "send_it" {
      to                = ["jon@jonudell.info"]
      from              = "jon@jonudell.info"
      smtp_username     = "judell@fastmail.com"
      smtp_password     = env("FASTMAIL_PW")
      host              = "smtp.fastmail.com"
      port              = 587
      subject           = "new access key"
      content_type      = "text/html"
      body              = <<EOT
        "New access key ${param.message}"
      EOT
  }
}```

See https://flowpipe.io/docs/flowpipe-hcl/trigger/query for more.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment