-
-
Save miku/fb53af7576bc3e5dd1df3ad627db4d07 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# https://stackoverflow.com/questions/66030105/luigi-sqlite-how-to-update-database-after-initial-load | |
# | |
# $ pip install luigi | |
# | |
# Run: | |
# | |
# $ python main.py && echo "select * from dummy" | sqlite3 -separator $'\t' dummy.db | |
# | |
# After each invocation a new row is added. | |
# | |
# 1 2023-09-14 09:21:03 hello 30 world | |
# 2 2023-09-14 09:24:03 hello 73 world | |
# 3 2023-09-14 09:25:33 hello 50 world | |
import datetime | |
import luigi | |
import sqlite3 | |
import random | |
dbfile = "dummy.db" | |
schema = """ | |
CREATE TABLE IF NOT EXISTS dummy ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
t TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
message TEXT | |
) | |
""" | |
class LoadData(luigi.Task): | |
def run(self): | |
with sqlite3.connect(dbfile) as db: | |
cursor = db.cursor() | |
cursor.execute( | |
""" | |
INSERT INTO dummy (message) VALUES (?) | |
""", | |
(f"hello {random.randint(0, 100)} world",), | |
) | |
# def output(self): | |
# # (1) will run only once | |
# return luigi.LocalTarget(dbfile) | |
# def complete(self): | |
# # (2) will run every time the task is called | |
# return False | |
def complete(self): | |
# (3) custom logic, e.g. backoff if the last snapshot happened recently | |
with sqlite3.connect(dbfile) as db: | |
cursor = db.cursor() | |
cursor.execute( | |
""" | |
select strftime('%s', 'now') - strftime('%s', t) | |
from dummy | |
order by t desc | |
limit 1 | |
""" | |
) | |
row = cursor.fetchone() | |
if row is None: | |
return False | |
diff_s = row[0] # 2023-09-14 09:25:33 | |
return int(diff_s) < 10 # last snapshot was recently added, not doing anything | |
if __name__ == "__main__": | |
with sqlite3.connect(dbfile) as db: | |
db.execute(schema) | |
task = LoadData() | |
luigi.build([task], local_scheduler=True) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
luigi==3.3.0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment