Skip to content

Instantly share code, notes, and snippets.

Created February 2, 2024 17:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save judell/428ff0d064be960f33463ab9dacf9f5d to your computer and use it in GitHub Desktop.
Save judell/428ff0d064be960f33463ab9dacf9f5d to your computer and use it in GitHub Desktop.

Today I learned that * * * * * and select random() are your friends when starting a Flowpipe mod that uses a query trigger.

* * * * * to iterate as fast as possible: every minute

select random() to ensure there is always fresh data, otherwise nothing will happen

If you're trying to debug something downstream, like sending email, this ensures you'll actually test the downstream thing every time.

Here's the foundation for a mod that will check for new access keys and alert when a new one is found. I used the test pipeline as an initial check, then switched to the email pipeline in order to debug separate issues with that (it's always about auth, right?) once I knew the pipeline would receive a message every minute.

When it's running I can see the message in three places: the console (using output "test"), the email, and also in flowpipe.db, which lives at the mod's root.


Now that all the moving parts are aligned, I can switch to the real query, like this, that will do nothing until there's a newly-created key.

  create_date > now() - interval '1 day'  ```

I always have to remind myself to start with the simplest things and expand incrementally. That's especially important when building/debugging/testing workflows which are notoriously fiddly creatures.

Here's today's little test harness, in case helpful to anyone.

```trigger "query" "aws_iam_access_key" {
  connection_string = "postgres://steampipe@localhost:9193/steampipe"
  schedule = "* * * * *"
  primary_key = "message"
  sql = <<EOQ
    select random()::text as message

  capture "insert" {
    pipeline =

    args = {
      message = self.inserted_rows[0].message


pipeline "test" {

  param "message" {
    type = string

  output "test" {
    value = param.message

pipeline "email" {

  param "message" {
    type = string

  output "test" {
    value = param.message

  step "email" "send_it" {
      to                = [""]
      from              = ""
      smtp_username     = ""
      smtp_password     = env("FASTMAIL_PW")
      host              = ""
      port              = 587
      subject           = "new access key"
      content_type      = "text/html"
      body              = <<EOT
        "New access key ${param.message}"

See for more.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment