Skip to content

Instantly share code, notes, and snippets.

@atomgiant
Created May 7, 2020 16:56
Show Gist options
  • Save atomgiant/ba0d8c8cba0e6b32ea125a19b49c7b28 to your computer and use it in GitHub Desktop.
Save atomgiant/ba0d8c8cba0e6b32ea125a19b49c7b28 to your computer and use it in GitHub Desktop.
Shopify REST Client based on Excon
class ShopifyClient
# Initialize the Shopify api client
def initialize(shopify_domain:, shopify_token:, api_version: '2020-01')
@shopify_domain = shopify_domain
@shopify_token = shopify_token
@api_version = api_version
end
# ********** APPLICATION CHARGES **********
# Get a single recurring application charge.
#
# application_charge_id - The id of the charge.
def get_recurring_application_charge(application_charge_id)
rsp = request(:get, "recurring_application_charges/#{application_charge_id}.json")
rsp.result["recurring_application_charge"]
end
# Create a recurring application charge.
#
# options - supported options
#
# :name - The name of the charge
# :price - The price of the charge
# :return_url - The url to send the user back to after activation
# :trial_days - Number of days for the trial, default is 0.
# :test - Set to true to create a test charge, nil (default) to create a real charge
def create_recurring_application_charge(options={})
opts = {trial_days: 0}.merge(options)
body = {
recurring_application_charge: opts
}.to_json
rsp = request(:post, "recurring_application_charges.json", body: body)
rsp.result["recurring_application_charge"]
end
# Activate an approved recurring application charge.
def activate_recurring_application_charge(application_charge_id)
rsp = request(:post, "recurring_application_charges/#{application_charge_id}/activate.json")
rsp.result["recurring_application_charge"]
end
# List recurring application charges for a shop
#
def list_recurring_application_charges
rsp = request(:get, "recurring_application_charges.json")
rsp.result["recurring_application_charges"]
end
def cancel_recurring_application_charge(application_charge_id)
rsp = request(:delete, "recurring_application_charges/#{application_charge_id}.json")
rsp.result
end
# ********** COLLECTIONS **********
# Get a Shopify collection
def get_collection(collection_id)
rsp = request(:get, "collections/#{collection_id}.json")
rsp.result["collection"]
end
# List collects for a custom collection
#
# collection_id - the collection id
# fields - optional comma-separated list of fields to return; defaults to all
# limit - defaults to 250
# page_info - the pagination token
def list_collection_collects(collection_id, fields: nil, limit: 250, page_info: nil)
opts = { fields: fields, limit: limit, page_info: page_info }
if page_info.nil?
opts.merge!({ collection_id: collection_id })
end
request(:get, "collects.json?#{opts.compact.to_query}")
end
# List a Shopify collections products
#
# collection_id - the collection id
# fields - optional comma-separated list of fields to return; defaults to all
# limit - defaults to 250
# page_info - the pagination token
def list_collection_products(collection_id, fields: nil, limit: 250, page_info: nil)
opts = { fields: fields, limit: limit, page_info: page_info }
request(:get, "collections/#{collection_id}/products.json?#{opts.compact.to_query}")
end
# ********** CUSTOM COLLECTIONS **********
# Get a Shopify collection by it's id
def get_custom_collection(collection_id)
rsp = request(:get, "custom_collections/#{collection_id}.json")
rsp.result["custom_collection"]
end
# List custom collections
#
# fields - optional comma-separated list of fields to return; defaults to all
# limit - defaults to 250
# page_info - the pagination token
def list_custom_collections(fields: nil, limit: 250, page_info: nil)
opts = { fields: fields, limit: limit, page_info: page_info }
request(:get, "custom_collections.json?#{opts.compact.to_query}")
end
# Update custom collection
#
# options - supported options
#
# :title - The title
# :body_html
# :sort_order
# :handle
# :collects - an Array of collect Hashes: [ { id: 1234, product_id: 123}, ... ]
def update_custom_collection(collection_id, options={})
body = build_custom_collection(options)
rsp = request(:put, "custom_collections/#{collection_id}.json", body: body.to_json)
rsp.result["custom_collection"]
end
# Private
def build_custom_collection(options={})
opts = {
}
[:title, :body_html, :sort_order, :handle, :collects].each do |e|
opts[e] = options[e] if options[e]
end
body = {
custom_collection: opts
}
body
end
# Delete collection
def delete_custom_collection(collection_id)
rsp = request(:delete, "custom_collections/#{collection_id}.json")
rsp.result
end
# ********** SMART COLLECTIONS **********
# Get a Shopify smart collection by it's id
def get_smart_collection(collection_id)
rsp = request(:get, "smart_collections/#{collection_id}.json")
rsp.result["smart_collection"]
end
# List smart collections
#
# fields - optional comma-separated list of fields to return; defaults to all
# limit - defaults to 250
# page_info - the pagination token
def list_smart_collections(fields: nil, limit: 250, page_info: nil)
opts = { fields: fields, limit: limit, page_info: page_info }
request(:get, "smart_collections.json?#{opts.compact.to_query}")
end
# Order the smart collection products.
#
# products - array of product ids, in the order to sort them
# sort_order - the sort order, must be manual for products to work
def order_smart_collection(collection_id, products: [], sort_order: nil)
opts = {products: products, sort_order: sort_order }
rsp = request(:put, "smart_collections/#{collection_id}/order.json", body: opts.compact.to_json)
rsp.result
end
# ********** ORDERS **********
# List orders for a store
#
# fields - optional comma-separated list of fields to return; defaults to all
# processed_at_min - minimum processed at date in iso8601 format; defaults to nil
# processed_at_max - maximum processed at date in iso8601 format; defaults to nil
# limit - defaults to 250
# status - order status; defaults to 'any'
# page_info - the pagination token
def list_orders(fields: nil, limit: 250, status: 'any', page_info: nil, processed_at_min: nil, processed_at_max: nil)
opts = { fields: fields, limit: limit, page_info: page_info }
if page_info.nil?
opts.merge!({ processed_at_min: processed_at_min, processed_at_max: processed_at_max, status: status })
end
request(:get, "orders.json?#{opts.compact.to_query}")
end
# ********** PRODUCTS **********
# List products for a store
#
# fields - optional comma-separated list of fields to return; defaults to all
# limit - defaults to 250
# page_info - the pagination token
def list_products(fields: nil, limit: 250, page_info: nil)
opts = { fields: fields, limit: limit, page_info: page_info }
request(:get, "products.json?#{opts.compact.to_query}")
end
# Get a product
# id - The Shopify product id
# options - The options Hash
#
# :fields - (optional) - the Array of product fields to get.
def get_product(id, options={})
opts = {}
if options[:fields]
opts[:fields] = options[:fields].join(",")
end
rsp = request(:get, "products/#{id}.json?#{opts.to_query}")
rsp.result["product"]
end
# Update a product
#
# options - the product fields to update (restricted intentionally)
#
# :metafields - A list of metafield Hashes - keys: ["id", "namespace", "key", "value"]
# :tags - A comman separated list of tags
def update_product(id, options={})
opts = {}
if options[:metafields].present?
opts[:metafields] = options[:metafields]
end
if options[:tags].present?
opts[:tags] = options[:tags]
end
body = {
product: opts
}.to_json
rsp=request(:put, "products/#{id}.json?", body: body)
rsp.result["product"]
end
def count_products
rsp = request(:get, "products/count.json")
rsp.result["count"]
end
# ********** METAFIELDS **********
# Get a list of product metafields.
#
# object_type - :collection OR :product
# object_id - the collection or product id
#
# options
#
# :namespace - the metafield namespace
def get_metafields(object_type, object_id, options={})
raise "Invalid object_type: #{object_type}" unless [:collection, :product].include?(object_type)
opts = {namespace: options[:namespace]} if options[:namespace].present?
query_string = "?#{opts.to_query}" if opts.present?
rsp = request(:get, "#{object_type.to_s.pluralize}/#{object_id}/metafields.json#{query_string}")
rsp.result["metafields"]
end
# ********** SHOP **********
def get_shop
rsp = request(:get, "shop.json")
rsp.result["shop"]
end
# ********** THEMES **********
# Get a list of themes for a shop
#
# options
# :role -> e.g. - 'main'
def list_themes(options={})
opts = {}
opts[:role] = options[:role] if options[:role]
rsp = request(:get, "themes.json?#{opts.to_query}")
rsp.result['themes']
end
# Get a single theme asset by key
def get_theme_asset(theme_id, key)
opts = {
asset: {
key: key,
}
}
rsp = request(:get, "themes/#{theme_id}/assets.json?#{opts.to_query}")
rsp.result['asset']
end
# Update a theme asset
#
# theme_id - the Shopify theme id
# key - the asset path - e.g. snippets/foo.liquid
# value - the new value to store in the theme file
def update_theme_asset(theme_id, key, value)
body = {
asset: {
key: key,
value: value,
}
}.to_json
rsp = request(:put, "themes/#{theme_id}/assets.json", body: body)
rsp.result["asset"]
end
# ********** WEBHOOKS **********
# Create one webhook
def create_webhook(topic, address)
body = {
webhook: {
topic: topic,
address: address,
format: 'json'
}
}.to_json
rsp = request(:post, 'webhooks.json', body: body)
rsp.result["webhook"]
end
def delete_webhook(id)
rsp = request(:delete, "webhooks/#{id}.json")
rsp.result
end
def list_webhooks
rsp = request(:get, "webhooks.json")
rsp.result['webhooks']
end
# Sends a request to Shopify
#
# method - the http method (e.g. :get, :post)
# path - the relative Shopify path (e.g.: "/products.json")
# options -
# :body - optional request body
#
# On Succss returns a ShopifyClient::ShopifyResponse
# On Error raises a ShopifyClient::ShopifyError
def request(method, path, options={})
headers = {
"Accept" => "application/json",
"Content-Type" => "application/json",
"X-Shopify-Access-Token" => @shopify_token,
}
begin
response = send_request(path, headers, options[:body], method)
rescue ShopifyTooManyRequestsError => e
retry_seconds = rand(1) + 1 # 1-2 seconds
Rails.logger.info("Shopify 429 Too many requests - retrying in #{retry_seconds} seconds")
sleep(retry_seconds)
retry
end
end
private
def send_request(path, headers, body, method)
excon_response = Excon.new(shopify_api_url(path), headers: headers, body: body).
request(method: method, read_timeout: 240, write_timeout: 240)
case excon_response.status
when 200,201
# all good, fall through
when 429
raise ShopifyTooManyRequestsError.new
else
message = "Shopify error - status: #{excon_response.status}, method: #{method}, path: #{path}"
Rails.logger.error(message)
Rails.logger.error(excon_response.body)
raise ShopifyError.new(message, excon_response)
end
if Rails.logger.debug?
Rails.logger.debug("Shopify API response:\n#{excon_response.body}")
end
ShopifyResponse.new(excon_response)
end
def shopify_api_url(path)
"https://#{@shopify_domain}/admin/api/#{@api_version}/#{path}"
end
class ShopifyResponse
attr_reader :response
def initialize(response)
@response = response
end
def headers
response.headers
end
def status
response.status
end
def result
@result ||= JSON.parse(response.body)
end
def next_page_info
page_info(:next)
end
def previous_page_info
page_info(:previous)
end
def page_info(type)
links = (headers["Link"] || "").split(",")
l = links.find { |e| e.match(%Q{rel="#{type}"}) } || ""
if m = l.match('page_info\=([a-zA-Z0-9_-]+)')
m.captures[0]
else
nil
end
end
end
class ShopifyError < StandardError
attr_reader :response
def initialize(message, response)
super(message)
@response = response
end
def errors
@errors ||= JSON.parse(@response.body)["errors"] rescue []
end
def full_error_messages
errors.map { |k,v| [k.capitalize, v].join(" ") }
end
def status
response.status
end
end
class ShopifyTooManyRequestsError < StandardError; end;
end
@atomgiant
Copy link
Author

To get paginated Shopify products:

    rsp = shopify_client.list_products(limit: 250)
    rsp.result["products"].each do |e|
      # process the product
    end

    if rsp.next_page_info
      # fetch the next page
       rsp = shopify_client.list_products(limit: 250, page_info: rsp.next_page_info)
    else

@forsbergplustwo
Copy link

❤️

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment