Skip to content

Instantly share code, notes, and snippets.

@fpytloun
Last active March 26, 2024 10:05
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fpytloun/155a975e1491d39a2b71647ca923a11a to your computer and use it in GitHub Desktop.
Save fpytloun/155a975e1491d39a2b71647ca923a11a to your computer and use it in GitHub Desktop.
Vector s3sync wrapper for exec source
#!/usr/bin/env python3
import argparse
import json
import logging
import os
import random
import signal
import subprocess
import sys
import time
sys.stdout.reconfigure(line_buffering=True)
sys.stderr.reconfigure(line_buffering=True)
logging.basicConfig(
format='%(asctime)s.%(msecs)03d000Z %(levelname)s %(name)s::%(funcName)s: %(message)s',
datefmt='%Y-%m-%dT%H:%M:%S')
lg = logging.getLogger('s3sync')
METRICS = {
"total_executions": 0,
"successful_executions": 0,
"failed_executions": 0,
"synced_files": 0,
"synced_size": 0,
"empty_files": 0,
"deleted_files": 0,
"execution_time": 0,
"sync_time": 0,
"cleanup_time": 0,
}
METRICS_TAGS = {}
SHUTDOWN = False
SHUTDOWN_SYNC = 0
def sigterm_handler(_signo, _stack_frame):
global SHUTDOWN
if SHUTDOWN is True:
lg.warning("Signal {} received during clean shutdown, terminating immediately".format(_signo))
sys.exit(0)
else:
lg.warning("Signal {} received, running clean shutdown".format(_signo))
SHUTDOWN = True
def parse_args(args=None):
parser = argparse.ArgumentParser()
parser.add_argument('--region', help="S3 region", required=True)
parser.add_argument('-w', '--workers', type=int, help="S3sync workers",
default=64)
parser.add_argument('-i', '--interval', help="Sync interval", type=int,
default=10)
parser.add_argument('-m', '--filter-mtime', help="Sync only files not changed for this seconds", type=int, default=60)
parser.add_argument('--delete-mtime', help="Delete files not changed for this seconds, 0 = delete immediately, -1 = never delete", type=int, default=600)
parser.add_argument('-v', '--verbose', help="Enable verbose logging",
action="store_true", required=False)
parser.add_argument('-d', '--debug', help="Enable debug logging",
action="store_true", required=False)
parser.add_argument('source', help="Source directory")
parser.add_argument('target', help="Target directory")
return parser.parse_args(args)
def print_metrics():
print(json.dumps({**METRICS, **METRICS_TAGS}))
# Reset counters
for k in METRICS.keys():
METRICS[k] = 0
def delete_empty_path(path, basedir=None):
if not os.path.isdir(path):
# Not a directory, file or nothing, so pick parent in path
path = os.path.dirname(path)
if path == basedir:
lg.debug("Skipping deletion of base directory {}".format(basedir))
return
try:
os.rmdir(path)
lg.debug("Deleted empty directory {}".format(path))
except OSError:
# Not empty, don't do anything
lg.debug("{} not empty or doesn't exist, skipping deletion".format(path))
return
# Removed directory so process parent the same way
delete_empty_path(os.path.dirname(path))
def cleanup_path(path, filter_before_mtime):
for root, dirs, files in os.walk(path, topdown=False):
for name in files:
file = os.path.join(root, name)
stat = os.stat(file)
if stat.st_mtime < filter_before_mtime:
os.remove(file)
lg.debug("Deleted old file {} (mtime -{} seconds)".format(file, int(time.time() - stat.st_mtime)))
METRICS["deleted_files"] += 1
delete_empty_path(file, path)
def main():
args = parse_args()
if args.verbose:
lg.setLevel(logging.INFO)
else:
lg.setLevel(logging.WARNING)
if args.debug:
lg.setLevel(logging.DEBUG)
global SHUTDOWN_SYNC
METRICS_TAGS["source"] = args.source
METRICS_TAGS["target"] = args.target
# Initial sync synces everything that was changed from before 5 minutes until now
t_last_synced = int(time.time()) - 5 * 60
while True:
t_start = time.time()
if not os.path.isdir(args.source):
lg.warning("Source directory {} does not exist, skipping sync".format(args.source))
if SHUTDOWN is True:
sys.exit(0)
time.sleep(args.interval + random.randrange(0, int(args.interval / 2)))
continue
if SHUTDOWN is True:
# Graceful shutdown, sync everything
lg.info("Processing graceful shutdown, ignoring --filter-mtime and syncing everything")
filter_mtime = int(time.time())
SHUTDOWN_SYNC += 1
else:
filter_mtime = int(time.time()) - args.filter_mtime
try:
lg.info("Running s3sync from source fs://{}/ to s3://{}/".format(args.source, args.target))
t_sync_start = time.time()
s3sync_args = ["--tk", os.getenv("AWS_ACCESS_KEY_ID"),
"--ts", os.getenv("AWS_SECRET_ACCESS_KEY"),
"--tr", args.region,
"-w", str(args.workers),
"--filter-before-mtime", str(filter_mtime),
"--s3-retry", "3",
"--s3-retry-sleep", str(random.randrange(1, 8)),
"--fs-disable-xattr",
"--sync-log",
"--sync-log-format", "json"]
if args.delete_mtime != 0:
# If we delete immediately, then always sync everything,
# otherwise just what was changed since last run
s3sync_args.append("--filter-after-mtime", str(t_last_synced))
proc = subprocess.run(["s3sync", *s3sync_args,
"fs://{}/".format(args.source),
"s3://{}/".format(args.target)],
capture_output=True, text=True)
METRICS["sync_time"] = int((time.time() - t_sync_start) * 1000)
METRICS["total_executions"] += 1
if proc.returncode > 0:
lg.error("Process exited with return code {}, some files might fail to sync".format(proc.returncode))
METRICS["failed_executions"] += 1
if args.delete_mtime != 0:
# If we delete successfuly synced files, then just re-try
# sync failed files on next run.
# Otherwise, if we keep files and delete them later, we
# don't want to re-upload everything on next run, in such
# case, failed files are skipped (unless it's mtime is
# updated).
t_last_synced = filter_mtime
else:
METRICS["successful_executions"] += 1
t_last_synced = filter_mtime
for line in proc.stderr.splitlines():
try:
line = json.loads(line)
except Exception:
lg.exception("Failed to parse output line: {}".format(line))
continue
if line.get("key"):
path = os.path.join(args.source, line["key"])
if line["size"] > 10:
lg.debug("Synced file {}".format(path))
METRICS["synced_files"] += 1
else:
lg.debug("Synced empty file {}".format(path))
METRICS["empty_files"] += 1
if args.delete_mtime == 0:
lg.debug("Deleting synced file: {}".format(path))
os.unlink(path)
lg.debug("Deleting empty directory structure: {}".format(os.path.dirname(path)))
delete_empty_path(os.path.dirname(path), args.target)
METRICS["synced_size"] += line["size"]
if line.get("level") == "error":
lg.error(line['msg'])
if args.delete_mtime > 0 and not SHUTDOWN:
# Cleanup files that are not modified in past args.delete_mtime seconds
lg.info("Running cleanup (mtime -{} minutes)".format(args.delete_mtime / 60))
t_cleanup_start = time.time()
cleanup_path(args.source, int(time.time() - args.delete_mtime))
METRICS["cleanup_time"] = int((time.time() - t_cleanup_start) * 1000)
METRICS["execution_time"] = int((time.time() - t_start) * 1000)
finally:
print_metrics()
if SHUTDOWN is True and SHUTDOWN_SYNC >= 3:
lg.warning("Exitting gracefully after {} runs".format(SHUTDOWN_SYNC))
sys.exit(0)
time.sleep(args.interval + random.randrange(0, int(args.interval / 2)))
if __name__ == '__main__':
signal.signal(signal.SIGTERM, sigterm_handler)
signal.signal(signal.SIGINT, sigterm_handler)
main()
@fpytloun
Copy link
Author

fpytloun commented Mar 17, 2022

To use file sink instead of s3:

    # Use file sink and external tool to sync S3
    [sinks.out_file]
      type = "file"
      inputs = ["remap_file"]
      compression = "gzip"
      encoding.codec = "ndjson"
      path = "/var/lib/vector/out_file/topics/{{ _topic }}/year=%Y/month=%m/day=%d/hour=%H/minute=%M/${HOSTNAME}.json.gz"
      idle_timeout_secs = 30

To configure s3sync as exec input, parse logs and expose it's metrics:

    [sources.in_sync_s3]
      type = "exec"
      mode = "streaming"
      command = ["s3sync.py", "--region", "us-west-2", "--filter-mtime", "90", "/var/lib/vector/out_file", "logs-bucket"]
      include_stderr = true
      streaming.respawn_interval_secs = 5

    [transforms.parse_sync_s3_metrics]
      type = "remap"
      inputs = ["in_sync_s3"]
      source = '''
      if match(.message, r'^\{.*') ?? false {
        . = parse_json!(.message)
      } else {
        abort
      }
      '''

    [transforms.log_to_metric_sync_s3]
      type = "log_to_metric"
      inputs = ["parse_sync_s3_metrics"]
      metrics = [
        { type = "gauge", field = "execution_time", name = "s3sync_execution_time", namespace = "vector", tags = {source = "{{source}}", target = "{{target}}"} },
        { type = "gauge", field = "sync_time", name = "s3sync_sync_time", namespace = "vector", tags = {source = "{{source}}", target = "{{target}}"} },
        { type = "gauge", field = "cleanup_time", name = "s3sync_cleanup_time", namespace = "vector", tags = {source = "{{source}}", target = "{{target}}"} },
        { type = "counter", increment_by_value = true, field = "total_executions", name = "s3sync_total_executions", namespace = "vector", tags = {source = "{{source}}", target = "{{target}}"} },
        { type = "counter", increment_by_value = true, field = "successful_executions", name = "s3sync_successful_executions", namespace = "vector", tags = {source = "{{source}}", target = "{{target}}"} },
        { type = "counter", increment_by_value = true, field = "failed_executions", name = "s3sync_failed_executions", namespace = "vector", tags = {source = "{{source}}", target = "{{target}}"} },
        { type = "counter", increment_by_value = true, field = "synced_files", name = "s3sync_synced_files", namespace = "vector", tags = {source = "{{source}}", target = "{{target}}"} },
        { type = "counter", increment_by_value = true, field = "deleted_files", name = "s3sync_deleted_files", namespace = "vector", tags = {source = "{{source}}", target = "{{target}}"} },
        { type = "counter", increment_by_value = true, field = "empty_files", name = "s3sync_empty_files", namespace = "vector", tags = {source = "{{source}}", target = "{{target}}"} }
      ]

    [transforms.parse_sync_s3_log]
      type = "remap"
      inputs = ["in_sync_s3_access"]
      source = '''
      if match(.message, r'^\{.*') ?? false {
        abort
      }
      '''

    [sinks.out_sync_s3]
      type = "console"
      inputs = ["parse_sync_s3_access_log"]
      encoding.codec = "text"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment