Skip to content

Instantly share code, notes, and snippets.

@miku

miku/main.py Secret

Last active September 14, 2023 09:44
Show Gist options
  • Save miku/fb53af7576bc3e5dd1df3ad627db4d07 to your computer and use it in GitHub Desktop.
Save miku/fb53af7576bc3e5dd1df3ad627db4d07 to your computer and use it in GitHub Desktop.
# https://stackoverflow.com/questions/66030105/luigi-sqlite-how-to-update-database-after-initial-load
#
# $ pip install luigi
#
# Run:
#
# $ python main.py && echo "select * from dummy" | sqlite3 -separator $'\t' dummy.db
#
# After each invocation a new row is added.
#
# 1 2023-09-14 09:21:03 hello 30 world
# 2 2023-09-14 09:24:03 hello 73 world
# 3 2023-09-14 09:25:33 hello 50 world
import datetime
import luigi
import sqlite3
import random
dbfile = "dummy.db"
schema = """
CREATE TABLE IF NOT EXISTS dummy (
id INTEGER PRIMARY KEY AUTOINCREMENT,
t TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
message TEXT
)
"""
class LoadData(luigi.Task):
def run(self):
with sqlite3.connect(dbfile) as db:
cursor = db.cursor()
cursor.execute(
"""
INSERT INTO dummy (message) VALUES (?)
""",
(f"hello {random.randint(0, 100)} world",),
)
# def output(self):
# # (1) will run only once
# return luigi.LocalTarget(dbfile)
# def complete(self):
# # (2) will run every time the task is called
# return False
def complete(self):
# (3) custom logic, e.g. backoff if the last snapshot happened recently
with sqlite3.connect(dbfile) as db:
cursor = db.cursor()
cursor.execute(
"""
select strftime('%s', 'now') - strftime('%s', t)
from dummy
order by t desc
limit 1
"""
)
row = cursor.fetchone()
if row is None:
return False
diff_s = row[0] # 2023-09-14 09:25:33
return int(diff_s) < 10 # last snapshot was recently added, not doing anything
if __name__ == "__main__":
with sqlite3.connect(dbfile) as db:
db.execute(schema)
task = LoadData()
luigi.build([task], local_scheduler=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment