Skip to content

Instantly share code, notes, and snippets.

@kapcod
Created October 10, 2021 18:27
Show Gist options
  • Save kapcod/b7bd7c3c0c3c120321a9ed6c4cefb6a9 to your computer and use it in GitHub Desktop.
Save kapcod/b7bd7c3c0c3c120321a9ed6c4cefb6a9 to your computer and use it in GitHub Desktop.
Airflow DAG log cleaner. Assumes log structure /var/log/airflow/<dag>/<operator>/<date>. Call: `cleanup_airflow_logs.rb 7` to retain only last week of logs
#!/usr/bin/env ruby
# frozen_string_literal: true
require 'fileutils'
require 'time'
class AirflowLogCleaner
attr_reader :list, :to_delete
def list_folders
@list ||= Dir['/var/log/airflow/*/*/*']
end
def select_to_delete(retention_days)
puts "Scanning #{list.size} folders..."
threshold = Time.now - retention_days * 24 * 3600
@to_delete = []
list.each_with_index do |dir, index|
time = Time.parse(File.basename(dir))
@to_delete << dir if time < threshold
end
end
def delete!
puts "Deleting #{to_delete.size} folders..."
to_delete.each_with_index do |dir, index|
FileUtils.rm_r(dir)
sleep 1 if (index+1) % 1000 == 0 # limit disk io below 1000/s
puts "Deleted #{index+1}/#{to_delete.size}" if (index+1) % 10000 == 0
rescue => e
puts "Error for #{dir}: #{e}"
end
end
def perform(retention_days)
start_ts = Time.now.to_i
puts "Pulling folders list"
list_folders
select_to_delete(retention_days)
delete!
puts "Done. Took #{Time.now.to_i - start_ts}s"
end
def count_by_op
list.map{|dir| dir.split('/')[4..] }.group_by{|a,b,c| a}.transform_values {|ar| ar.group_by {|a,b,c| b }.transform_values(&:size)}
end
def count_by_dag
count_by_op.transform_values { |ar| ar.sum { |k, v| v } }
end
def inspect
"\#<#{self.class.name}: #{object_id}, #{{list: @list&.size, to_delete: @to_delete&.size}.inspect}>"
end
end
retention_days = ARGV[0].to_i
if retention_days > 0
AirflowLogCleaner.new.perform(retention_days)
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment