Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
# typed: true
# frozen_string_literal: true
require('dev')
require('fileutils')
module Dev
module Helpers
class APFSVolume
extend(T::Sig)
NotMounted = Class.new(StandardError)
AlreadyExists = Class.new(StandardError)
SYSTEM_KEYCHAIN = "/Library/Keychains/System.keychain"
SYNTHETIC_CONF = '/etc/synthetic.conf'
sig { params(name: String, mountpoint: String).void }
def initialize(name, mountpoint)
raise("unsupported name") if name =~ /\s/
@name = name
@mountpoint = mountpoint
end
def dep
Dev::Dep::Chain.new(title: "#{@name} Volume") \
<< mountpoint_existence \
<< volume_existence \
<< mount_status \
<< ownership_bit \
<< ownership_of_mountpoint \
<< filevault \
<< passphrase_migration \
<< fstab_inclusion \
<< spotlight_indexing
end
private
def mountpoint_existence
Dev::DepFactory.new(self, "Existence of #{@mountpoint} mountpoint")
.met? { |_ctx| Dir.exist?(@mountpoint) }
.meet(&method(:create_mountpoint))
.build
end
def volume_existence
Dev::DepFactory.new(self, "Existence of #{@name} volume")
.met? { |ctx| volumes_with_name(ctx).size > 0 }
.meet(&method(:create_volume))
.build
end
def mount_status
Dev::DepFactory.new(self, "Mounted-ness of #{@name} volume on #{@mountpoint}")
.met? { |ctx| !mounted_device_path(ctx).nil? }
.meet(&method(:mount_volume))
.build
end
def ownership_bit
Dev::DepFactory.new(self, "'Owners' bit on #{@name} volume")
.met? { |ctx| disk_info(ctx)['Owners'] == 'Enabled' }
.meet(&method(:enable_ownership))
.build
end
def ownership_of_mountpoint
Dev::DepFactory.new(self, "Mountpoint #{@mountpoint} ownership")
.met? { |_ctx| File.stat(@mountpoint).uid == Process.uid }
.meet(&method(:chown_mountpoint))
.build
end
def filevault
Dev::DepFactory.new(self, "FileVault enabled status on #{@name} volume")
.met? { |ctx| disk_info(ctx)['FileVault'] == 'Yes' }
.meet(&method(:enable_filevault))
.build
end
def passphrase_migration
Dev::DepFactory.new(self, "FileVault password migration for #{@name} volume to System keychain")
.met?(&method(:passphrase_in_system_keychain?))
.meet(&method(:migrate_passphrase))
.build
end
def fstab_inclusion
Dev::DepFactory.new(self, "Presence of #{@name} volume in /etc/fstab")
.met?(&method(:in_fstab?))
.meet(&method(:add_to_fstab))
.build
end
def spotlight_indexing
Dev::DepFactory.new(self, "Prevention of Spotlight's unfortunate tendency to index #{@mountpoint}")
.met?(&method(:spotlight_indexing_disabled?))
.meet(&method(:disable_spotlight_indexing))
.build
end
def create_mountpoint(ctx)
if File.dirname(@mountpoint) != '/'
raise(Dev::Bug, "didn't expect this to be called with a non-root directory")
end
# e.g. nix isntead of /nix
unless synthetic_conf =~ /^#{@mountpoint[1..-1]}$/m
ctx.sudo_reason("password required to edit {{bold:#{SYNTHETIC_CONF}}}")
Dev::Util.sudo_append_file(ctx, SYNTHETIC_CONF, "#{@mountpoint[1..-1]}\n")
end
unless Dir.exist?(@mountpoint)
oe, stat = ctx.capture2e("/System/Library/Filesystems/apfs.fs/Contents/Resources/apfs.util", "-B")
unless stat.success?
raise(Dev::Bug, "apfs.util failed: #{oe}")
end
end
end
def create_volume(ctx)
_, err, stat = ctx.capture3_sudo(
'diskutil', 'apfs', 'addVolume', root_disk(ctx), 'APFSX', @name, '-mountpoint', @mountpoint,
)
raise(Dev::Bug, "diskutil create_volume failed: #{err}") unless stat.success?
purge
end
sig { params(ctx: Dev::Context).returns(T::Boolean) }
def volume_mounted?(ctx)
mounted_device_path(ctx) != nil
end
sig { params(ctx: Dev::Context).void }
def mount_volume(ctx)
_, err, stat = ctx.capture3_sudo('diskutil', 'mount', @mountpoint)
raise(Dev::Abort, "diskutil mount_volume failed: #{err}") unless stat.success?
end
sig { params(ctx: Dev::Context).void }
def enable_filevault(ctx)
require('securerandom')
passphrase = SecureRandom.hex(32)
save_passphrase(ctx, passphrase)
_, err, stat = ctx.capture3_sudo(
'diskutil', 'apfs', 'enableFileVault', mounted_device_path(ctx),
'-user', 'disk', '-passphrase', passphrase,
)
raise(Dev::Bug, "failed to enable FileVault: #{err}") unless stat.success?
purge
end
def in_fstab?(_ctx)
File.read("/etc/fstab").include?(@mountpoint)
rescue Errno::ENOENT
false
end
sig { params(ctx: Dev::Context).void }
def add_to_fstab(ctx)
ctx.sudo_reason("need to edit {{info:/etc/fstab}}")
entry = "LABEL=#{@name} #{@mountpoint} apfs rw"
Dev::Util.sudo_append_file(ctx, '/etc/fstab', "#{entry}\n")
end
def user_keychains(ctx)
@user_keychains ||= begin
out, err, stat = ctx.capture3('security', 'list-keychains', '-d', 'user')
raise(Dev::Bug, "failed to enumerate user keychains: #{err}") unless stat.success?
# line example: %{ "/Users/...."\n}
out.lines.map { |l| l.scan(/"(.*)"/).flatten.first }
end
end
def save_passphrase(ctx, passphrase)
ctx.sudo_reason("need to write filevault passphrase to System keychain")
ctx.system_sudo('true')
_, err, stat = ctx.capture3_sudo(
'security', 'add-generic-password',
'-l', @name,
'-a', uuid(ctx),
'-s', uuid(ctx),
'-D', 'Encrypted Volume Password',
'-w', passphrase,
'-T', '/System/Library/CoreServices/APFSUserAgent',
'-T', '/System/Library/CoreServices/CSUserAgent',
SYSTEM_KEYCHAIN
)
if !stat.success? && err !~ /The specified item already exists in the keychain/
raise(Dev::Abort, "unable to store password in keychain: #{err}")
end
end
def passphrase_in_system_keychain?(ctx)
out, _, stat = ctx.capture3(
'security', 'find-generic-password', '-l', @name, '-a', uuid(ctx),
)
return(false) unless stat.success?
# includes a line 'keychain: "<path>"'
out.include?(SYSTEM_KEYCHAIN)
end
def get_passphrase(ctx)
out, err, stat = ctx.capture3(
'security', 'find-generic-password', '-l', @name, '-a', uuid(ctx), '-w'
)
raise(Dev::Abort, "unable to retrieve filevault password from login keychain: #{err}") unless stat.success?
out.chomp
end
def delete_passphrase_from_user_keychains(ctx)
out, err, stat = ctx.capture3(
'security', 'delete-generic-password', '-l', @name, '-a', uuid(ctx), *user_keychains(ctx)
)
raise(Dev::Abort, "unable to remove passphrase from user keychains: #{err}") unless stat.success?
out.chomp
end
def migrate_passphrase(ctx)
passphrase = get_passphrase(ctx)
save_passphrase(ctx, passphrase)
delete_passphrase_from_user_keychains(ctx)
end
sig { params(ctx: Dev::Context).returns(String) }
def uuid(ctx)
disk_info(ctx).fetch('Volume UUID')
end
sig { params(ctx: Dev::Context).void }
def enable_ownership(ctx)
ctx.system_sudo('true')
_, err, stat = ctx.capture3_sudo('diskutil', 'enableOwnership', mounted_device_path(ctx))
raise(Dev::Bug, "diskutil enableOwnership failed: #{err}") unless stat.success?
purge
end
def chown_mountpoint(ctx)
# We don't bother with -R because it should be empty other than
# .Trashes and some other stuff that we don't have permission to
# modify anyway.
_, err, stat = ctx.capture3_sudo('chown', Process.uid.to_s, @mountpoint)
raise(Dev::Bug, "chown failed: #{err}") unless stat.success?
end
def spotlight_indexing_disabled?(ctx)
Helpers::Mdutil.indexing_enabled?(ctx, @mountpoint) == false
rescue Helpers::Mdutil::IndexingDisabled
true
rescue Helpers::Mdutil::UnknownIndexingState
# seems like spotlight's just kinda borked, not much I can figure out
# to recover from this one.
true
end
def disable_spotlight_indexing(ctx)
Helpers::Mdutil.set_indexing(ctx, @mountpoint, false)
rescue Helpers::Mdutil::IndexingDisabled
nil
end
sig { void }
def purge
@disk_info = nil
@volumes_with_name = nil
@mounted_device_path = nil
end
# `mount` prints something like:
# /dev/disk1s5 on / (apfs, ...)
# we want the "disk1" part.
sig { params(ctx: Dev::Context).returns(String) }
def root_disk(ctx)
out, err, stat = ctx.capture3('mount')
raise(Dev::Bug, "mount failed: #{err}") unless stat.success?
T.must(out.scan(%r{/dev/(disk\d)s\d on / }m).flatten.first)
end
# returns the mounted device (e.g. /dev/disk1s6) if mounted; else nil.
sig { params(ctx: Dev::Context).returns(T.nilable(String)) }
def mounted_device_path(ctx)
@mounted_device_path ||= begin
out, err, stat = ctx.capture3('/bin/df', @mountpoint)
raise("df failed: #{err}") unless stat.success?
# e.g. ["/dev/disk1s6", ..., "/nix"]
fields = out.lines.drop(1).first.split(/\s+/)
return(nil) unless fields[-1] == @mountpoint
fields.first
end
end
sig { params(ctx: Dev::Context).returns(T::Array[String]) }
def volumes_with_name(ctx)
@volumes_with_name ||= begin
out, err, stat = ctx.capture3('diskutil', 'list')
raise("diskutil list failed: #{err}") unless stat.success?
# out should contain a line like:
# 6: APFS Volume Nix 15.9 GB disk1s6
# one guy has a volume called "NIX" and I'm not sure why, but we'll
# match case-insensitive to catch it too in case more people have
# that.
out.scan(/APFS Volume #{@name}\s+[\d\.]+ .B\s+(\S+)/i).flatten.map { |d| "/dev/#{d}" }
end
end
def synthetic_conf
File.read(SYNTHETIC_CONF)
rescue Errno::ENOENT
''
end
# asks diskutil for information on whichever device is mounted at the
# @mountpoint.
sig { params(ctx: Dev::Context).returns(T::Hash[String, String]) }
def disk_info(ctx)
@disk_info ||= begin
device = mounted_device_path(ctx)
raise(NotMounted) unless device
disk_info_for_device(ctx, device)
end
end
def disk_info_for_device(ctx, device)
out, err, stat = ctx.capture3('diskutil', 'info', device)
if stat.success?
Hash[out.scan(/\s+(.*?):\s+(.*?)$/m)]
else
raise(Dev::Bug, "diskutil info error: #{err}")
end
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment