Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
# typed: true
# frozen_string_literal: true
require('dev')
require('fileutils')
module Dev
module Helpers
class APFSVolume
extend(T::Sig)
NotMounted = Class.new(StandardError)
AlreadyExists = Class.new(StandardError)
SYSTEM_KEYCHAIN = "/Library/Keychains/System.keychain"
SYNTHETIC_CONF = '/etc/synthetic.conf'
sig { params(name: String, mountpoint: String).void }
def initialize(name, mountpoint)
raise("unsupported name") if name =~ /\s/
@name = name
@mountpoint = mountpoint
end
def dep
Dev::Dep::Chain.new(title: "#{@name} Volume") \
<< mountpoint_existence \
<< volume_existence \
<< mount_status \
<< ownership_bit \
<< ownership_of_mountpoint \
<< filevault \
<< passphrase_migration \
<< fstab_inclusion \
<< spotlight_indexing
end
private
def mountpoint_existence
Dev::DepFactory.new(self, "Existence of #{@mountpoint} mountpoint")
.met? { |_ctx| Dir.exist?(@mountpoint) }
.meet(&method(:create_mountpoint))
.build
end
def volume_existence
Dev::DepFactory.new(self, "Existence of #{@name} volume")
.met? { |ctx| volumes_with_name(ctx).size > 0 }
.meet(&method(:create_volume))
.build
end
def mount_status
Dev::DepFactory.new(self, "Mounted-ness of #{@name} volume on #{@mountpoint}")
.met? { |ctx| !mounted_device_path(ctx).nil? }
.meet(&method(:mount_volume))
.build
end
def ownership_bit
Dev::DepFactory.new(self, "'Owners' bit on #{@name} volume")
.met? { |ctx| disk_info(ctx)['Owners'] == 'Enabled' }
.meet(&method(:enable_ownership))
.build
end
def ownership_of_mountpoint
Dev::DepFactory.new(self, "Mountpoint #{@mountpoint} ownership")
.met? { |_ctx| File.stat(@mountpoint).uid == Process.uid }
.meet(&method(:chown_mountpoint))
.build
end
def filevault
Dev::DepFactory.new(self, "FileVault enabled status on #{@name} volume")
.met? { |ctx| disk_info(ctx)['FileVault'] == 'Yes' }
.meet(&method(:enable_filevault))
.build
end
def passphrase_migration
Dev::DepFactory.new(self, "FileVault password migration for #{@name} volume to System keychain")
.met?(&method(:passphrase_in_system_keychain?))
.meet(&method(:migrate_passphrase))
.build
end
def fstab_inclusion
Dev::DepFactory.new(self, "Presence of #{@name} volume in /etc/fstab")
.met?(&method(:in_fstab?))
.meet(&method(:add_to_fstab))
.build
end
def spotlight_indexing
Dev::DepFactory.new(self, "Prevention of Spotlight's unfortunate tendency to index #{@mountpoint}")
.met?(&method(:spotlight_indexing_disabled?))
.meet(&method(:disable_spotlight_indexing))
.build
end
def create_mountpoint(ctx)
if File.dirname(@mountpoint) != '/'
raise(Dev::Bug, "didn't expect this to be called with a non-root directory")
end
# e.g. nix isntead of /nix
unless synthetic_conf =~ /^#{@mountpoint[1..-1]}$/m
ctx.sudo_reason("password required to edit {{bold:#{SYNTHETIC_CONF}}}")
Dev::Util.sudo_append_file(ctx, SYNTHETIC_CONF, "#{@mountpoint[1..-1]}\n")
end
unless Dir.exist?(@mountpoint)
oe, stat = ctx.capture2e("/System/Library/Filesystems/apfs.fs/Contents/Resources/apfs.util", "-B")
unless stat.success?
raise(Dev::Bug, "apfs.util failed: #{oe}")
end
end
end
def create_volume(ctx)
_, err, stat = ctx.capture3_sudo(
'diskutil', 'apfs', 'addVolume', root_disk(ctx), 'APFSX', @name, '-mountpoint', @mountpoint,
)
raise(Dev::Bug, "diskutil create_volume failed: #{err}") unless stat.success?
purge
end
sig { params(ctx: Dev::Context).returns(T::Boolean) }
def volume_mounted?(ctx)
mounted_device_path(ctx) != nil
end
sig { params(ctx: Dev::Context).void }
def mount_volume(ctx)
_, err, stat = ctx.capture3_sudo('diskutil', 'mount', @mountpoint)
raise(Dev::Abort, "diskutil mount_volume failed: #{err}") unless stat.success?
end
sig { params(ctx: Dev::Context).void }
def enable_filevault(ctx)
require('securerandom')
passphrase = SecureRandom.hex(32)
save_passphrase(ctx, passphrase)
_, err, stat = ctx.capture3_sudo(
'diskutil', 'apfs', 'enableFileVault', mounted_device_path(ctx),
'-user', 'disk', '-passphrase', passphrase,
)
raise(Dev::Bug, "failed to enable FileVault: #{err}") unless stat.success?
purge
end
def in_fstab?(_ctx)
File.read("/etc/fstab").include?(@mountpoint)
rescue Errno::ENOENT
false
end
sig { params(ctx: Dev::Context).void }
def add_to_fstab(ctx)
ctx.sudo_reason("need to edit {{info:/etc/fstab}}")
entry = "LABEL=#{@name} #{@mountpoint} apfs rw"
Dev::Util.sudo_append_file(ctx, '/etc/fstab', "#{entry}\n")
end
def user_keychains(ctx)
@user_keychains ||= begin
out, err, stat = ctx.capture3('security', 'list-keychains', '-d', 'user')
raise(Dev::Bug, "failed to enumerate user keychains: #{err}") unless stat.success?
# line example: %{ "/Users/...."\n}
out.lines.map { |l| l.scan(/"(.*)"/).flatten.first }
end
end
def save_passphrase(ctx, passphrase)
ctx.sudo_reason("need to write filevault passphrase to System keychain")
ctx.system_sudo('true')
_, err, stat = ctx.capture3_sudo(
'security', 'add-generic-password',
'-l', @name,
'-a', uuid(ctx),
'-s', uuid(ctx),
'-D', 'Encrypted Volume Password',
'-w', passphrase,
'-T', '/System/Library/CoreServices/APFSUserAgent',
'-T', '/System/Library/CoreServices/CSUserAgent',
SYSTEM_KEYCHAIN
)
if !stat.success? && err !~ /The specified item already exists in the keychain/
raise(Dev::Abort, "unable to store password in keychain: #{err}")
end
end
def passphrase_in_system_keychain?(ctx)
out, _, stat = ctx.capture3(
'security', 'find-generic-password', '-l', @name, '-a', uuid(ctx),
)
return(false) unless stat.success?
# includes a line 'keychain: "<path>"'
out.include?(SYSTEM_KEYCHAIN)
end
def get_passphrase(ctx)
out, err, stat = ctx.capture3(
'security', 'find-generic-password', '-l', @name, '-a', uuid(ctx), '-w'
)
raise(Dev::Abort, "unable to retrieve filevault password from login keychain: #{err}") unless stat.success?
out.chomp
end
def delete_passphrase_from_user_keychains(ctx)
out, err, stat = ctx.capture3(
'security', 'delete-generic-password', '-l', @name, '-a', uuid(ctx), *user_keychains(ctx)
)
raise(Dev::Abort, "unable to remove passphrase from user keychains: #{err}") unless stat.success?
out.chomp
end
def migrate_passphrase(ctx)
passphrase = get_passphrase(ctx)
save_passphrase(ctx, passphrase)
delete_passphrase_from_user_keychains(ctx)
end
sig { params(ctx: Dev::Context).returns(String) }
def uuid(ctx)
disk_info(ctx).fetch('Volume UUID')
end
sig { params(ctx: Dev::Context).void }
def enable_ownership(ctx)
ctx.system_sudo('true')
_, err, stat = ctx.capture3_sudo('diskutil', 'enableOwnership', mounted_device_path(ctx))
raise(Dev::Bug, "diskutil enableOwnership failed: #{err}") unless stat.success?
purge
end
def chown_mountpoint(ctx)
# We don't bother with -R because it should be empty other than
# .Trashes and some other stuff that we don't have permission to
# modify anyway.
_, err, stat = ctx.capture3_sudo('chown', Process.uid.to_s, @mountpoint)
raise(Dev::Bug, "chown failed: #{err}") unless stat.success?
end
def spotlight_indexing_disabled?(ctx)
Helpers::Mdutil.indexing_enabled?(ctx, @mountpoint) == false
rescue Helpers::Mdutil::IndexingDisabled
true
rescue Helpers::Mdutil::UnknownIndexingState
# seems like spotlight's just kinda borked, not much I can figure out
# to recover from this one.
true
end
def disable_spotlight_indexing(ctx)
Helpers::Mdutil.set_indexing(ctx, @mountpoint, false)
rescue Helpers::Mdutil::IndexingDisabled
nil
end
sig { void }
def purge
@disk_info = nil
@volumes_with_name = nil
@mounted_device_path = nil
end
# `mount` prints something like:
# /dev/disk1s5 on / (apfs, ...)
# we want the "disk1" part.
sig { params(ctx: Dev::Context).returns(String) }
def root_disk(ctx)
out, err, stat = ctx.capture3('mount')
raise(Dev::Bug, "mount failed: #{err}") unless stat.success?
T.must(out.scan(%r{/dev/(disk\d)s\d on / }m).flatten.first)
end
# returns the mounted device (e.g. /dev/disk1s6) if mounted; else nil.
sig { params(ctx: Dev::Context).returns(T.nilable(String)) }
def mounted_device_path(ctx)
@mounted_device_path ||= begin
out, err, stat = ctx.capture3('/bin/df', @mountpoint)
raise("df failed: #{err}") unless stat.success?
# e.g. ["/dev/disk1s6", ..., "/nix"]
fields = out.lines.drop(1).first.split(/\s+/)
return(nil) unless fields[-1] == @mountpoint
fields.first
end
end
sig { params(ctx: Dev::Context).returns(T::Array[String]) }
def volumes_with_name(ctx)
@volumes_with_name ||= begin
out, err, stat = ctx.capture3('diskutil', 'list')
raise("diskutil list failed: #{err}") unless stat.success?
# out should contain a line like:
# 6: APFS Volume Nix 15.9 GB disk1s6
# one guy has a volume called "NIX" and I'm not sure why, but we'll
# match case-insensitive to catch it too in case more people have
# that.
out.scan(/APFS Volume #{@name}\s+[\d\.]+ .B\s+(\S+)/i).flatten.map { |d| "/dev/#{d}" }
end
end
def synthetic_conf
File.read(SYNTHETIC_CONF)
rescue Errno::ENOENT
''
end
# asks diskutil for information on whichever device is mounted at the
# @mountpoint.
sig { params(ctx: Dev::Context).returns(T::Hash[String, String]) }
def disk_info(ctx)
@disk_info ||= begin
device = mounted_device_path(ctx)
raise(NotMounted) unless device
disk_info_for_device(ctx, device)
end
end
def disk_info_for_device(ctx, device)
out, err, stat = ctx.capture3('diskutil', 'info', device)
if stat.success?
Hash[out.scan(/\s+(.*?):\s+(.*?)$/m)]
else
raise(Dev::Bug, "diskutil info error: #{err}")
end
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.