Skip to content

Instantly share code, notes, and snippets.

@paralleltree
Last active January 8, 2023 15:21
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save paralleltree/cfd86f8c9b54ea3664315d465bd3d95a to your computer and use it in GitHub Desktop.
Save paralleltree/cfd86f8c9b54ea3664315d465bd3d95a to your computer and use it in GitHub Desktop.
Fitbit APIを通じて最新の心拍数を取得するやつ
require 'date'
require 'oauth2'
require 'base64'
CLIENT_ID = 'xxxx'
CLIENT_SECRET = 'xxxx'
REDIRECT_URL = 'http://localhost:8888'
CREDENTIAL_CACHE = 'credentials.json'
class TokenStore
def initialize(cache_file_path)
@file_path = cache_file_path
end
def save(token)
open(@file_path, 'w') { |f| f.write(JSON.generate({ access_token: token.token, refresh_token: token.refresh_token })) }
end
def restore
raise unless available?
credentials = open(@file_path) { |f| JSON.parse(f.read) }
yield [credentials['access_token'], credentials['refresh_token']]
end
def available?
File.exists?(@file_path)
end
end
class FitbitClient
class InvalidAccessTokenError < StandardError; end
AUTHORIZATION_URL = 'https://www.fitbit.com/oauth2/authorize'
ACCESS_TOKEN_REQUEST_URL = 'https://api.fitbit.com/oauth2/token'
def initialize(client_id, client_secret, redirect_uri, token_store)
@redirect_uri = redirect_uri
@token_store = token_store
@client = OAuth2::Client.new(client_id, client_secret, site: ACCESS_TOKEN_REQUEST_URL.match(%r{[^/]*//[^/]*}).to_s, authorize_url: AUTHORIZATION_URL, token_url: ACCESS_TOKEN_REQUEST_URL)
if token_store.available?
@token = token_store.restore do |access_token, refresh_token|
OAuth2::AccessToken.new(@client, access_token, refresh_token: refresh_token)
end
end
end
def authorize_url(redirect_uri)
@client.auth_code.authorize_url(redirect_uri: @redirect_uri, scope: 'heartrate')
end
def authorize(code)
obtain_token { @client.auth_code.get_token(code, redirect_uri: @redirect_uri, client_id: @client.id, headers: authorization_header) }
end
def get(*args)
retrying = false
begin
@token.get(*args)
rescue OAuth2::Error => ex
raise if retrying
res = JSON.parse(ex.response.body)
errorTypes = res['errors'].map { |e| e['errorType'] }
if errorTypes.any?('expired_token')
obtain_token { @token.refresh!(headers: authorization_header) }
retrying = true
retry
end
raise InvalidAccessTokenError if errorTypes.any?('invalid_token') || errorTypes.any?('expired_token')
end
end
private
def obtain_token
@token = yield
@token_store.save(@token)
end
def authorization_header
{ 'Authorization': 'Basic ' + Base64.encode64("#{@client.id}:#{@client.secret}").chomp! }
end
end
def get_latest_heartbeat(client)
date = Date.today
since = date.prev_day(7)
while date > since
data = JSON.parse(client.get("/1/user/-/activities/heart/date/#{date.strftime("%Y-%m-%d")}/1d/1min.json").body)
latest = data['activities-heart-intraday']['dataset'].last
if latest
time = latest['time'].scan(/\d+/).reverse.map.with_index { |d, i| d.to_i * (60 ** i) }.sum
return { time: date.to_time + time, value: latest['value'] }
end
date = date.prev_day
end
return nil
end
token_store = TokenStore.new(CREDENTIAL_CACHE)
client = FitbitClient.new(CLIENT_ID, CLIENT_SECRET, REDIRECT_URL, token_store)
case ARGV[0]
when 'authorize'
puts client.authorize_url(REDIRECT_URL)
print "Enter the authorization code: "
code = STDIN.gets.chomp!
client.authorize(code)
else
latest = get_latest_heartbeat(client) || {}
behind_in_sec = Time.now - (latest[:time] || Time.at(0))
output = {
last_sync_time: latest[:time]&.iso8601,
heart_rate: latest[:value],
status: behind_in_sec < 60 * 60 * 72 ? "alive" : "dead"
}
puts JSON.pretty_generate(output)
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment