Skip to content

Instantly share code, notes, and snippets.

@TooManyBees
Last active January 25, 2023 16:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save TooManyBees/17b3318adad38b6b73e602f8d71bc05a to your computer and use it in GitHub Desktop.
Save TooManyBees/17b3318adad38b6b73e602f8d71bc05a to your computer and use it in GitHub Desktop.
Search script for my own mastodon posts
#!/usr/bin/env ruby
# My instance doesn't support full text searching of posts, but I always
# want to know whether or not I've already published the post I'm about
# to make, and have to scroll through my own timeline just to double
# check. Well no more!
# You must use the --url argument to select the instance and user to search.
# You must also use the --access-token argument if that server only accepts
# authenticated requests. (Get an access token from the server's Development
# settings tab.) You don't need to set either of these arguments if you already
# set the MASTO_SEARCH_URL and MASTO_ACCESS_TOKEN environment variables.
require "getoptlong"
require "open-uri"
require "json"
begin
require "htmlentities"
rescue LoadError
end
Signal.trap("SIGINT") { exit(1) }
Signal.trap("SIGTERM") { exit(1) }
opts = GetoptLong.new(
["--access-token", GetoptLong::REQUIRED_ARGUMENT],
["--all", "-a", GetoptLong::NO_ARGUMENT],
["--boosts", GetoptLong::NO_ARGUMENT],
["--case-insensitive", "-i", GetoptLong::NO_ARGUMENT],
["--help", "-h", GetoptLong::NO_ARGUMENT],
["--replies", GetoptLong::NO_ARGUMENT],
["--silent", "-s", GetoptLong::NO_ARGUMENT],
["--url", GetoptLong::REQUIRED_ARGUMENT],
["--verbose", "-v", GetoptLong::NO_ARGUMENT],
)
masto_url = ENV["MASTO_SEARCH_URL"]
access_token = ENV["MASTO_ACCESS_TOKEN"]
verbose = false
silent = false
case_insensitive = false
before_time = nil
after_time = nil
exclude_reblogs = true
exclude_replies = true
search_all = false
opts.each do |opt, value|
case opt
when "--access-token"
access_token = value unless value.empty?
when "--all"
search_all = true
when "--boosts"
exclude_reblogs = false
when "--case-insensitive"
case_insensitive = true
when "--replies"
exclude_replies = false
when "--silent"
silent = true
verbose = false
when "--url"
masto_url = value unless value.empty?
when "--verbose"
verbose = true
silent = false
when "--help"
STDERR.puts %Q(
Usage: #{File.basename($0)} [OPTIONS...] --url <URL> <QUERY>
QUERY
The search term to find. If QUERY is surrounded with forward slashes /like this/, it will be parsed as a regular expression literal per https://ruby-doc.org/core-3.0.2/Regexp.html.
URL
The address of the Mastodon user API endpoint to search. It must match the pattern "/api/v1/accounts/\\d+/statuses". Falls back to the env variable MASTO_SEARCH_URL if left blank.
OPTIONS
--access-token
The token to use in an Authentication header. Use this if you get a "401 Unauthorized" response. Get an access token from the your profile's Developer settings.
-a, --all
Search for all matching posts instead of only the most recent one.
--boosts
Searches boosts as well as original posts.
--url
Specifies the URL of the Mastodon instance to search.
-i, --case-insensitive
Search without comparing letter case. When QUERY is a string, compared strings will be downcased per https://ruby-doc.org/core-3.0.2/String.html#method-i-downcase. When QUERY is a regular expression, the "ignore case" option will be used.
--replies
Searches replies as well as original posts.
-s, --silent
Silent mode. Disables output except for a found search result. Disables -v, --verbose.
-v, --verbose
Prints verbose output. Disables -s, --silent.
).lstrip
exit(0)
end
end
query = ARGV.shift
if query.nil?
STDERR.puts "Missing required search string or regex"
exit(1)
end
pattern = if query.match(%r{\A/(?<source>.*)/(?<options>[[:alpha:]]*)\z})
options = $~["options"].chars.map do |opt|
case opt
when "i"
Regexp::IGNORECASE
when "m"
Regexp::MULTILINE
when "x"
Regexp::EXTENDED
end
end.compact.inject(&:|)
options |= Regexp::IGNORECASE if case_insensitive
Regexp.new($~["source"], options)
else
case_insensitive ? query.downcase : query
end
html_entities = HTMLEntities.new if defined?(HTMLEntities)
def find_next_url(link_header)
return if link_header.nil?
match = link_header.match(/<(.*)>;\s+rel="next"/)
match[1] if match
end
def validate_url(url)
if url.nil? || url.empty?
STDERR.puts "Missing required Mastodon search URL (set in env var MASTO_SEARCH_URL or argument --url)"
exit(1)
end
url = if url.match?(%r[^https?://])
url
else
"https://#{url}"
end
parsed = URI.parse(url)
parsed.query = nil
unless parsed.path.match?(%r[/api/v1/accounts/\d+/statuses])
raise URI::InvalidURIError.new("Statuses path #{parsed.path.inspect} does not match format \"/api/v1/accounts/\\d+/statuses\"")
end
parsed.to_s
rescue URI::InvalidURIError => e
STDERR.puts(e)
exit(1)
end
def report_result(found)
posted_at = Time.parse(found["created_at"]).strftime("%F %T") rescue found["created_at"]
posted_by = if !!found["reblog"]
"#{found.dig("reblog", "account", "display_name")} (#{found.dig("reblog", "account", "acct")})"
else
"#{found.dig("account", "display_name")} (#{found.dig("account", "acct")})"
end
STDOUT.puts %Q{
Found: #{found["uri"]}
Posted by: #{posted_by}
Posted at: #{posted_at}
#{found["content"]}
}.lstrip
end
url_params = {
limit: 100,
exclude_reblogs: exclude_reblogs,
exclude_replies: exclude_replies,
}
account_url = validate_url(masto_url)
STDERR.puts("Searching #{account_url} for #{pattern}") if verbose
results = []
next_url = "#{account_url}?#{URI.encode_www_form(url_params)}"
while (results.empty? || search_all) && next_url
uri = URI.parse(next_url)
file = begin
uri.open({ "Authorization" => ("Bearer #{access_token}" if access_token) })
rescue => e
STDERR.puts
STDERR.puts e
exit(1)
end
next_url = find_next_url(file.meta["link"])
statuses = JSON.load(file.open)
found = statuses.select do |status|
content = status["content"]
content = html_entities.decode(content) if html_entities
content.downcase! if case_insensitive
content.match?(pattern)
end
results.push(*found)
if found.empty?
if verbose
start_time = Time.parse(statuses.first["created_at"]) rescue statuses.first["created_at"]
end_time = Time.parse(statuses.last["created_at"]) rescue statuses.last["created_at"]
STDERR.puts "#{start_time.strftime("%F")} - #{end_time.strftime("%F")}: not found"
else
STDERR.print(".") unless silent
end
else
search_all ? found.each { |item| report_result(item) } : report_result(found.first)
end
end
if results.empty?
STDERR.puts unless silent # Print to a different line than the progress dots
STDERR.puts "No matching posts found" unless silent
exit(1)
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment