Last active
December 11, 2023 21:14
-
-
Save mikeharty/2763ae9e79707f0295d8a658344c44db to your computer and use it in GitHub Desktop.
Token Flow - OAuth2 CLI utility
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'optparse' | |
require 'net/http' | |
require 'json' | |
require 'jwt' | |
require 'time' | |
def flow | |
env_opts | |
combine_opts | |
validate_opts | |
token_flow | |
export | |
append | |
end | |
# Hacked Hash that converts CLI options to snake case | |
# and converts grant_type and format to symbols | |
class CLIInputHash < Hash | |
def []=(key, value) | |
key = key.to_s | |
key = key.gsub('-', '_') | |
key = key.to_sym | |
value = value.to_sym if %i[grant_type format].include?(key) | |
super(key, value) | |
end | |
end | |
FLW = { | |
:config => {}, | |
:input => CLIInputHash.new, | |
:env => {}, | |
:default => { | |
:env_file => './.env.tokenflow', | |
:format => :human, | |
:grant_type => :authorization_code, | |
:client_id => nil, | |
:client_secret => nil, | |
:oauth2_host => nil, | |
:redirect_uri => nil, | |
:scope => nil, | |
:callback_port => 3030, | |
:callback_path => '/auth/callback' | |
}, | |
:required => { | |
:all => %i[grant_type client_id client_secret oauth2_host], | |
:authorization_code => %i[redirect_uri callback_port callback_path], | |
:refresh_token => %i[client_secret refresh_token], | |
:client_credentials => %i[] | |
}, | |
:optional => { | |
:all => %i[format env_file], | |
:authorization_code => %i[scope identity_provider], | |
:refresh_token => %i[], | |
:client_credentials => %i[scope] | |
}, | |
:envToCfgMap => { | |
'TF_OAUTH2_GRANT_TYPE' => :grant_type, | |
'TF_OAUTH2_HOST' => :oauth2_host, | |
'TF_OAUTH2_CLIENT_ID' => :client_id, | |
'TF_OAUTH2_CLIENT_SECRET' => :client_secret, | |
'TF_OAUTH2_REDIRECT_URI' => :redirect_uri, | |
'TF_OAUTH2_IDENTITY_PROVIDER' => :redirect_uri, | |
'TF_OAUTH2_SCOPE' => :scope, | |
'TF_OAUTH2_REFRESH_TOKEN' => :refresh_token, | |
'TF_OAUTH2_FORMAT' => :format, | |
'TF_OAUTH2_CALLBACK_PORT' => :callback_port, | |
'TF_OAUTH2_CALLBACK_PATH' => :callback_path, | |
'TF_ACCESS_TOKEN' => :access_token | |
}, | |
:log => [], | |
:grant_types => %i[authorization_code refresh_token client_credentials], | |
:formats => %i[none json human] | |
}.freeze | |
formats = FLW[:formats].map(&:to_s) | |
grants = FLW[:grant_types].map(&:to_s) | |
OptionParser.new do |opts| | |
opts.banner = <<~README | |
\e[108m\e[32m w\e[31mu\e[34mr\e[35ml \e[0m OAuth2 Token Flow | |
I use this script in a variety of ways in my workflow, e.g. I | |
run it as a part of a cron task during working hours to keep access | |
tokens fresh in my various projects, as a part of various tests, and | |
when I want to authenticate against an API as myself, rather than | |
using client credentials and authenticating as machine user. | |
Configurable with command line options, environment variables, or from | |
a .env file (in that order of precedence if there are conflicts.) | |
It suports most simple OAuth2 flows, but it's not a general purpose | |
OAuth2 client - it solves my problems. Notably, it does not support | |
PKCE code challenge. | |
By default it uses authorization code flow, which will open a browser | |
window to authenticate your user. Behind the scenes, it starts a | |
small web server that waits for a callback from the OAuth provider. | |
Your OAuth provider must be configured to allow redirecting to a host | |
name accessible on your machine. | |
By default, tokens are printed to stdout. | |
Format options: | |
"none" - access token only | |
"human" - readable access token with details | |
"json" - json formatted | |
Also supports writing to a file or copying to clipoard via pbcopy. | |
This wasn't built to be released. Caveat emptor. | |
See -h for more details. | |
README | |
d = ->(d) { "\e[90m\e[03m[#{d}]\e[0m" } | |
opts.on('-e', '--env-file ', String, 'path to .env file (default is ./.env.tokenflow)') | |
opts.on('-g', '--grant-type ', grants, 'grant type', d[:grant_type]) | |
opts.on('-c', '--client-id ', String, 'client id') | |
opts.on('-i', '--client-secret ', String, 'client secret') | |
opts.on('-s', '--scope ', String, 'requested scopes, comma delimited', d[:scope]) | |
opts.on('-u', '--oauth2-host ', String, 'OAuth2 host', d[:oauth2_host]) | |
opts.on('-l', '--callback-port', String, 'Callback port', d[:callback_port]) | |
opts.on('-p', '--callback-path', String, 'Callback path', d[:callback_path]) | |
opts.on('-f', '--format ', formats, 'output format', d[:format]) | |
opts.on('-x', '--export ', String, 'export config path') | |
opts.on('-y', '--copy', 'copy access key to clipboard') | |
opts.on('-a', '--append-to ', String, 'append access key to .env file') | |
opts.on('-h', '--help', 'prints this help') do | |
puts opts | |
puts "\n" | |
exit true | |
end | |
end.parse!(into: FLW[:input]) | |
def clobber(key, a, b, c) | |
a &&= a[key] | |
b &&= b[key] | |
c &&= c[key] | |
a || b || c | |
end | |
def map_env_to_cfg(key) | |
FLW[:envToCfgMap][key] | |
end | |
def map_cfg_to_env(key) | |
FLW[:cfgToEnvMap] = FLW[:envToCfgMap].invert if FLW[:cfgToEnvMap].nil? | |
FLW[:cfgToEnvMap][key] | |
end | |
def env_opts | |
env_path = FLW[:input][:env_file] || FLW[:default][:env_file] | |
if env_path.nil? || !File.exist?(env_path) | |
log('.env file not found, maybe that was intentional. Continuing without it.') | |
return | |
end | |
# It's possible the env file will change the grant type, so we allow all options for now | |
allowed_opts = FLW[:envToCfgMap].keys | |
File.readlines(env_path).each do |line| | |
next unless line =~ /^([A-Z0-9_]+)=(.*)$/ | |
next unless Regexp.last_match(2).length | |
key = Regexp.last_match(1) | |
val = Regexp.last_match(2) | |
next unless allowed_opts.include?(key) | |
cfg_key = map_env_to_cfg(key) | |
val = val.to_sym if %i[grant_type format].include?(cfg_key) | |
FLW[:env][cfg_key] = val unless cfg_key.nil? | |
end | |
end | |
def get_allowed_opts(grant_type) | |
FLW[:required][grant_type] + FLW[:optional][grant_type] + FLW[:required][:all] + FLW[:optional][:all] | |
end | |
# Combine the CLI<ENV<DEFAULT options in order of precedence | |
def combine_opts | |
grant_type = clobber(:grant_type, FLW[:input], FLW[:env], FLW[:default]) | |
allowed_options = get_allowed_opts(grant_type) | |
allowed_options.each do |option| | |
result = clobber(option, FLW[:input], FLW[:env], FLW[:default]) | |
FLW[:config][option] = result unless result.nil? | |
end | |
FLW[:config].inspect | |
end | |
def validate_opts | |
grant_type = FLW[:config][:grant_type] | |
required = FLW[:required][grant_type] + FLW[:required][:all] | |
required.each do |opt| | |
if FLW[:config][opt].nil? | |
log("Required options for grant type #{grant_type}: #{required.join(', ')}") | |
raise "Missing required option #{opt}" | |
end | |
end | |
return unless FLW[:config][:grant_type].is_a? String | |
FLW[:config][:grant_type] = FLW[:config][:grant_type].to_sym | |
end | |
def log(msg, e = nil) | |
epoch = Time.now.to_i | |
now = Time.now.strftime('%Y-%m-%d %H:%M:%S') | |
case FLW[:config][:format] | |
when :json | |
# JSON logging | |
entry = { | |
:timestamp => epoch, | |
:message => msg | |
} | |
if e.nil? | |
FLW[:log] << entry | |
else | |
entry[:error] = e.message | |
entry[:status] = :error | |
entry[:log] = FLW[:log] | |
puts entry.to_json | |
end | |
when :human | |
# Human logging (default) | |
if e.nil? | |
puts "[#{now}] #{msg}" | |
else | |
puts "\e[31m[#{now}] #{msg}\e[0m" | |
puts e.backtrace.join("\n") | |
end | |
else | |
# No formatting, only log errors (for chaining) | |
entry = { | |
:timestamp => now, | |
:message => msg | |
} | |
FLW[:log].push entry | |
unless e.nil? | |
FLW[:log].each do |m| | |
puts "[#{m[:timestamp]}] #{m[:message]}" | |
end | |
puts e.backtrace.join("\n") | |
end | |
end | |
end | |
def token_flow | |
if FLW[:config][:grant_type] == :authorization_code | |
auth_code = get_authorization_code | |
tokens = get_tokens(auth_code) | |
else | |
tokens = get_tokens | |
end | |
print_tokens(tokens) | |
end | |
def get_tokens(auth_code = nil) | |
cfg = FLW[:config] | |
req_body = { | |
:client_id => cfg[:client_id], | |
:grant_type => cfg[:grant_type] | |
} | |
case cfg[:grant_type] | |
when :authorization_code | |
req_body[:code] = auth_code | |
req_body[:redirect_uri] = cfg[:redirect_uri] | |
req_body[:scope] = cfg[:scope] | |
when :client_credentials | |
req_body[:scope] = cfg[:scope] | |
when :refresh_token | |
req_body[:refresh_token] = cfg[:refresh_token] | |
end | |
req_headers = { | |
'Authorization' => "Basic #{Base64.strict_encode64("#{cfg[:client_id]}:#{cfg[:client_secret]}")}" | |
} | |
uri = URI.parse("#{cfg[:oauth2_host]}/oauth2/token") | |
http = Net::HTTP.new(uri.host, uri.port) | |
http.use_ssl = true | |
request = Net::HTTP::Post.new(uri, req_headers) | |
request.set_form_data(req_body) | |
response = http.request(request) | |
raise "Error getting token: #{response.code} #{response.message} #{response.body}" if response.code != '200' | |
response.body | |
end | |
def get_authorization_code | |
cfg = FLW[:config] | |
uri = URI("#{cfg[:oauth2_host]}/oauth2/authorize") | |
query = { | |
:response_type => 'code', | |
:client_id => cfg[:client_id], | |
:redirect_uri => cfg[:redirect_uri] | |
} | |
query[:scope] = cfg[:scope] if cfg[:scope] | |
uri.query = URI.encode_www_form(query) | |
log 'Opening login form in your default browser, return here after authentication...\n' | |
auth_url = uri.to_s | |
open = system "open '#{auth_url}'" | |
unless open | |
puts 'Opening browser failed, visit this URL to sign-in:\n' | |
puts auth_url | |
end | |
auth_code = nil | |
socket = TCPServer.new(cfg[:callback_port]) | |
loop do | |
client = socket.accept | |
first_line = client.gets | |
verb, path, = first_line.split | |
if verb == 'GET' && (path =~ /#{cfg[:callback_path]}\?code=(.*)/) | |
client.puts("HTTP/1.1 200 OK\r\n\r\nAll set, check your terminal.") | |
auth_code = Regexp.last_match(1) | |
break | |
else | |
client.puts("HTTP/1.1 404 Not Found\r\n\r\n") | |
log 'Received something unexpected on port 3001: %s' % first_line | |
log "Expected a GET request to #{cfg[:callback_path]} with an auth code." | |
log "If you're not sure what to do, try visiting this URL in your browser:\n" | |
log auth_url | |
log "I'll keep listening on port #{cfg[:callback_port]}..." | |
end | |
client.close | |
end | |
socket.close | |
raise 'Authorization failed.' if auth_code.nil? | |
auth_code | |
end | |
def print_token(token, type, exp) | |
exp = Time.at(Time.now.to_i + exp).strftime('%Y-%m-%d %H:%M:%S') if exp.is_a? Integer | |
puts "#{'-' * 30}\n" | |
puts "\e[94m#{type}\e[0m\n" | |
puts "\e[94mExpires: #{exp}\e[0m\n" | |
puts "#{'-' * 30}\n" | |
puts token | |
puts "#{'-' * 30}\n" | |
end | |
def print_tokens(tokens) | |
parsed = JSON.parse(tokens) | |
system "printf \"#{parsed['access_token']}\" | pbcopy" if FLW[:input][:copy] && !tokens['access_token'].nil? | |
if FLW[:input][:export] || FLW[:input][:append_to] | |
FLW[:config][:access_token] = parsed['access_token'] | |
parsed['access_token'] = '<REDACTED>' | |
parsed['expires_in'] = '<REDACTED>' | |
end | |
case FLW[:config][:format] | |
when :none | |
puts parsed['access_token'] | |
when :json | |
puts tokens | |
else | |
expires_in = parsed['expires_in'] | |
print_token(parsed['id_token'], 'ID Token', expires_in) unless parsed['id_token'].nil? | |
print_token(parsed['access_token'], 'Access Token', expires_in) unless parsed['access_token'].nil? | |
print_token(parsed['refresh_token'], 'Refresh Token', expires_in) unless parsed['refresh_token'].nil? | |
end | |
end | |
def config_as_env_file(config) | |
out = [] | |
config.each do |k, v| | |
env_key = map_cfg_to_env(k) | |
next if env_key.nil? || v.nil? | |
out << "#{env_key}=#{v}\n" | |
end | |
out.join | |
end | |
def export | |
return unless FLW[:input][:export] | |
File.write(FLW[:input][:export], config_as_env_file(FLW[:config])) | |
log("Exported config to #{FLW[:input][:export]}") | |
end | |
def append | |
return unless FLW[:input][:append_to] | |
written = false | |
lines = File.readlines(FLW[:input][:append_to]).map do |line| | |
if line =~ /^TOKEN_FLOW_ACCESS_TOKEN=/ | |
line = "TOKEN_FLOW_ACCESS_TOKEN=#{FLW[:config][:access_token]}" | |
written = true | |
end | |
line | |
end | |
lines << "TOKEN_FLOW_ACCESS_TOKEN=#{FLW[:config][:access_token]}" unless written | |
File.open(FLW[:input][:append_to], 'w') do |file| | |
file.puts lines | |
end | |
log("Wrote access token to #{FLW[:input][:append_to]} as TOKEN_FLOW_ACCESS_TOKEN") | |
end | |
begin | |
flow | |
rescue StandardError => e | |
log(e.message, e) | |
puts JSON.pretty_generate(FLW) | |
exit false | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment