Skip to content

Instantly share code, notes, and snippets.

@mikeharty
Last active December 11, 2023 21:14
Show Gist options
  • Save mikeharty/2763ae9e79707f0295d8a658344c44db to your computer and use it in GitHub Desktop.
Save mikeharty/2763ae9e79707f0295d8a658344c44db to your computer and use it in GitHub Desktop.
Token Flow - OAuth2 CLI utility
require 'optparse'
require 'net/http'
require 'json'
require 'jwt'
require 'time'
def flow
env_opts
combine_opts
validate_opts
token_flow
export
append
end
# Hacked Hash that converts CLI options to snake case
# and converts grant_type and format to symbols
class CLIInputHash < Hash
def []=(key, value)
key = key.to_s
key = key.gsub('-', '_')
key = key.to_sym
value = value.to_sym if %i[grant_type format].include?(key)
super(key, value)
end
end
FLW = {
:config => {},
:input => CLIInputHash.new,
:env => {},
:default => {
:env_file => './.env.tokenflow',
:format => :human,
:grant_type => :authorization_code,
:client_id => nil,
:client_secret => nil,
:oauth2_host => nil,
:redirect_uri => nil,
:scope => nil,
:callback_port => 3030,
:callback_path => '/auth/callback'
},
:required => {
:all => %i[grant_type client_id client_secret oauth2_host],
:authorization_code => %i[redirect_uri callback_port callback_path],
:refresh_token => %i[client_secret refresh_token],
:client_credentials => %i[]
},
:optional => {
:all => %i[format env_file],
:authorization_code => %i[scope identity_provider],
:refresh_token => %i[],
:client_credentials => %i[scope]
},
:envToCfgMap => {
'TF_OAUTH2_GRANT_TYPE' => :grant_type,
'TF_OAUTH2_HOST' => :oauth2_host,
'TF_OAUTH2_CLIENT_ID' => :client_id,
'TF_OAUTH2_CLIENT_SECRET' => :client_secret,
'TF_OAUTH2_REDIRECT_URI' => :redirect_uri,
'TF_OAUTH2_IDENTITY_PROVIDER' => :redirect_uri,
'TF_OAUTH2_SCOPE' => :scope,
'TF_OAUTH2_REFRESH_TOKEN' => :refresh_token,
'TF_OAUTH2_FORMAT' => :format,
'TF_OAUTH2_CALLBACK_PORT' => :callback_port,
'TF_OAUTH2_CALLBACK_PATH' => :callback_path,
'TF_ACCESS_TOKEN' => :access_token
},
:log => [],
:grant_types => %i[authorization_code refresh_token client_credentials],
:formats => %i[none json human]
}.freeze
formats = FLW[:formats].map(&:to_s)
grants = FLW[:grant_types].map(&:to_s)
OptionParser.new do |opts|
opts.banner = <<~README
\e[108m\e[32m w\e[31mu\e[34mr\e[35ml \e[0m OAuth2 Token Flow
I use this script in a variety of ways in my workflow, e.g. I
run it as a part of a cron task during working hours to keep access
tokens fresh in my various projects, as a part of various tests, and
when I want to authenticate against an API as myself, rather than
using client credentials and authenticating as machine user.
Configurable with command line options, environment variables, or from
a .env file (in that order of precedence if there are conflicts.)
It suports most simple OAuth2 flows, but it's not a general purpose
OAuth2 client - it solves my problems. Notably, it does not support
PKCE code challenge.
By default it uses authorization code flow, which will open a browser
window to authenticate your user. Behind the scenes, it starts a
small web server that waits for a callback from the OAuth provider.
Your OAuth provider must be configured to allow redirecting to a host
name accessible on your machine.
By default, tokens are printed to stdout.
Format options:
"none" - access token only
"human" - readable access token with details
"json" - json formatted
Also supports writing to a file or copying to clipoard via pbcopy.
This wasn't built to be released. Caveat emptor.
See -h for more details.
README
d = ->(d) { "\e[90m\e[03m[#{d}]\e[0m" }
opts.on('-e', '--env-file ', String, 'path to .env file (default is ./.env.tokenflow)')
opts.on('-g', '--grant-type ', grants, 'grant type', d[:grant_type])
opts.on('-c', '--client-id ', String, 'client id')
opts.on('-i', '--client-secret ', String, 'client secret')
opts.on('-s', '--scope ', String, 'requested scopes, comma delimited', d[:scope])
opts.on('-u', '--oauth2-host ', String, 'OAuth2 host', d[:oauth2_host])
opts.on('-l', '--callback-port', String, 'Callback port', d[:callback_port])
opts.on('-p', '--callback-path', String, 'Callback path', d[:callback_path])
opts.on('-f', '--format ', formats, 'output format', d[:format])
opts.on('-x', '--export ', String, 'export config path')
opts.on('-y', '--copy', 'copy access key to clipboard')
opts.on('-a', '--append-to ', String, 'append access key to .env file')
opts.on('-h', '--help', 'prints this help') do
puts opts
puts "\n"
exit true
end
end.parse!(into: FLW[:input])
def clobber(key, a, b, c)
a &&= a[key]
b &&= b[key]
c &&= c[key]
a || b || c
end
def map_env_to_cfg(key)
FLW[:envToCfgMap][key]
end
def map_cfg_to_env(key)
FLW[:cfgToEnvMap] = FLW[:envToCfgMap].invert if FLW[:cfgToEnvMap].nil?
FLW[:cfgToEnvMap][key]
end
def env_opts
env_path = FLW[:input][:env_file] || FLW[:default][:env_file]
if env_path.nil? || !File.exist?(env_path)
log('.env file not found, maybe that was intentional. Continuing without it.')
return
end
# It's possible the env file will change the grant type, so we allow all options for now
allowed_opts = FLW[:envToCfgMap].keys
File.readlines(env_path).each do |line|
next unless line =~ /^([A-Z0-9_]+)=(.*)$/
next unless Regexp.last_match(2).length
key = Regexp.last_match(1)
val = Regexp.last_match(2)
next unless allowed_opts.include?(key)
cfg_key = map_env_to_cfg(key)
val = val.to_sym if %i[grant_type format].include?(cfg_key)
FLW[:env][cfg_key] = val unless cfg_key.nil?
end
end
def get_allowed_opts(grant_type)
FLW[:required][grant_type] + FLW[:optional][grant_type] + FLW[:required][:all] + FLW[:optional][:all]
end
# Combine the CLI<ENV<DEFAULT options in order of precedence
def combine_opts
grant_type = clobber(:grant_type, FLW[:input], FLW[:env], FLW[:default])
allowed_options = get_allowed_opts(grant_type)
allowed_options.each do |option|
result = clobber(option, FLW[:input], FLW[:env], FLW[:default])
FLW[:config][option] = result unless result.nil?
end
FLW[:config].inspect
end
def validate_opts
grant_type = FLW[:config][:grant_type]
required = FLW[:required][grant_type] + FLW[:required][:all]
required.each do |opt|
if FLW[:config][opt].nil?
log("Required options for grant type #{grant_type}: #{required.join(', ')}")
raise "Missing required option #{opt}"
end
end
return unless FLW[:config][:grant_type].is_a? String
FLW[:config][:grant_type] = FLW[:config][:grant_type].to_sym
end
def log(msg, e = nil)
epoch = Time.now.to_i
now = Time.now.strftime('%Y-%m-%d %H:%M:%S')
case FLW[:config][:format]
when :json
# JSON logging
entry = {
:timestamp => epoch,
:message => msg
}
if e.nil?
FLW[:log] << entry
else
entry[:error] = e.message
entry[:status] = :error
entry[:log] = FLW[:log]
puts entry.to_json
end
when :human
# Human logging (default)
if e.nil?
puts "[#{now}] #{msg}"
else
puts "\e[31m[#{now}] #{msg}\e[0m"
puts e.backtrace.join("\n")
end
else
# No formatting, only log errors (for chaining)
entry = {
:timestamp => now,
:message => msg
}
FLW[:log].push entry
unless e.nil?
FLW[:log].each do |m|
puts "[#{m[:timestamp]}] #{m[:message]}"
end
puts e.backtrace.join("\n")
end
end
end
def token_flow
if FLW[:config][:grant_type] == :authorization_code
auth_code = get_authorization_code
tokens = get_tokens(auth_code)
else
tokens = get_tokens
end
print_tokens(tokens)
end
def get_tokens(auth_code = nil)
cfg = FLW[:config]
req_body = {
:client_id => cfg[:client_id],
:grant_type => cfg[:grant_type]
}
case cfg[:grant_type]
when :authorization_code
req_body[:code] = auth_code
req_body[:redirect_uri] = cfg[:redirect_uri]
req_body[:scope] = cfg[:scope]
when :client_credentials
req_body[:scope] = cfg[:scope]
when :refresh_token
req_body[:refresh_token] = cfg[:refresh_token]
end
req_headers = {
'Authorization' => "Basic #{Base64.strict_encode64("#{cfg[:client_id]}:#{cfg[:client_secret]}")}"
}
uri = URI.parse("#{cfg[:oauth2_host]}/oauth2/token")
http = Net::HTTP.new(uri.host, uri.port)
http.use_ssl = true
request = Net::HTTP::Post.new(uri, req_headers)
request.set_form_data(req_body)
response = http.request(request)
raise "Error getting token: #{response.code} #{response.message} #{response.body}" if response.code != '200'
response.body
end
def get_authorization_code
cfg = FLW[:config]
uri = URI("#{cfg[:oauth2_host]}/oauth2/authorize")
query = {
:response_type => 'code',
:client_id => cfg[:client_id],
:redirect_uri => cfg[:redirect_uri]
}
query[:scope] = cfg[:scope] if cfg[:scope]
uri.query = URI.encode_www_form(query)
log 'Opening login form in your default browser, return here after authentication...\n'
auth_url = uri.to_s
open = system "open '#{auth_url}'"
unless open
puts 'Opening browser failed, visit this URL to sign-in:\n'
puts auth_url
end
auth_code = nil
socket = TCPServer.new(cfg[:callback_port])
loop do
client = socket.accept
first_line = client.gets
verb, path, = first_line.split
if verb == 'GET' && (path =~ /#{cfg[:callback_path]}\?code=(.*)/)
client.puts("HTTP/1.1 200 OK\r\n\r\nAll set, check your terminal.")
auth_code = Regexp.last_match(1)
break
else
client.puts("HTTP/1.1 404 Not Found\r\n\r\n")
log 'Received something unexpected on port 3001: %s' % first_line
log "Expected a GET request to #{cfg[:callback_path]} with an auth code."
log "If you're not sure what to do, try visiting this URL in your browser:\n"
log auth_url
log "I'll keep listening on port #{cfg[:callback_port]}..."
end
client.close
end
socket.close
raise 'Authorization failed.' if auth_code.nil?
auth_code
end
def print_token(token, type, exp)
exp = Time.at(Time.now.to_i + exp).strftime('%Y-%m-%d %H:%M:%S') if exp.is_a? Integer
puts "#{'-' * 30}\n"
puts "\e[94m#{type}\e[0m\n"
puts "\e[94mExpires: #{exp}\e[0m\n"
puts "#{'-' * 30}\n"
puts token
puts "#{'-' * 30}\n"
end
def print_tokens(tokens)
parsed = JSON.parse(tokens)
system "printf \"#{parsed['access_token']}\" | pbcopy" if FLW[:input][:copy] && !tokens['access_token'].nil?
if FLW[:input][:export] || FLW[:input][:append_to]
FLW[:config][:access_token] = parsed['access_token']
parsed['access_token'] = '<REDACTED>'
parsed['expires_in'] = '<REDACTED>'
end
case FLW[:config][:format]
when :none
puts parsed['access_token']
when :json
puts tokens
else
expires_in = parsed['expires_in']
print_token(parsed['id_token'], 'ID Token', expires_in) unless parsed['id_token'].nil?
print_token(parsed['access_token'], 'Access Token', expires_in) unless parsed['access_token'].nil?
print_token(parsed['refresh_token'], 'Refresh Token', expires_in) unless parsed['refresh_token'].nil?
end
end
def config_as_env_file(config)
out = []
config.each do |k, v|
env_key = map_cfg_to_env(k)
next if env_key.nil? || v.nil?
out << "#{env_key}=#{v}\n"
end
out.join
end
def export
return unless FLW[:input][:export]
File.write(FLW[:input][:export], config_as_env_file(FLW[:config]))
log("Exported config to #{FLW[:input][:export]}")
end
def append
return unless FLW[:input][:append_to]
written = false
lines = File.readlines(FLW[:input][:append_to]).map do |line|
if line =~ /^TOKEN_FLOW_ACCESS_TOKEN=/
line = "TOKEN_FLOW_ACCESS_TOKEN=#{FLW[:config][:access_token]}"
written = true
end
line
end
lines << "TOKEN_FLOW_ACCESS_TOKEN=#{FLW[:config][:access_token]}" unless written
File.open(FLW[:input][:append_to], 'w') do |file|
file.puts lines
end
log("Wrote access token to #{FLW[:input][:append_to]} as TOKEN_FLOW_ACCESS_TOKEN")
end
begin
flow
rescue StandardError => e
log(e.message, e)
puts JSON.pretty_generate(FLW)
exit false
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment