Skip to content

Instantly share code, notes, and snippets.

@vschiavoni
Created June 18, 2015 14:27
Show Gist options
  • Save vschiavoni/753982f4fa31cc56f0e6 to your computer and use it in GitHub Desktop.
Save vschiavoni/753982f4fa31cc56f0e6 to your computer and use it in GitHub Desktop.
## This file is part of Splay.
##
## Splayd is free software: you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published
## by the Free Software Foundation, either version 3 of the License,
## or (at your option) any later version.
##
## Splayd is distributed in the hope that it will be useful,but
## WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
## See the GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with Splayd. If not, see <http://www.gnu.org/licenses/>.
$dbt = DBUtils.get_new_mysql_sequel
# $new_dbt = DBUtils.get_new_mysql
class SplaydServer
@@ssl = SplayControllerConfig::SSL
@@splayd_threads = {}
def self.threads() return @@splayd_threads end
def self.threads=(threads) @@splayd_threads = threads end
def initialize(port = nil)
@port = port || SplayControllerConfig::SplaydPort
end
def run
return Thread.new() do
main
end
end
def main
begin
server = TCPServer.new(@port)
server.setsockopt(Socket::SOL_SOCKET,Socket::SO_REUSEADDR, true)
if @@ssl
# SSL key and cert
key = OpenSSL::PKey::RSA.new 512
cert = OpenSSL::X509::Certificate.new
cert.not_before = Time.now
cert.not_after = Time.now + 3600
cert.public_key = key.public_key
cert.sign(key, OpenSSL::Digest::SHA1.new)
# SSL context
ctx = OpenSSL::SSL::SSLContext.new
ctx.key = key
ctx.cert = cert
server = OpenSSL::SSL::SSLServer.new(server, ctx)
$log.info("Waiting for splayds on port (SSL): " + @port.to_s)
else
$log.info("Waiting for splayds on port: " + @port.to_s)
end
rescue => e
$log.fatal(e.class.to_s + ": " + e.to_s + "\n" + e.backtrace.join("\n"))
return
end
# Protect accept() For example, a bad SSL negotiation makes accept()
# to raise an exception. Can protect against that and a DOS.
begin
loop do
so = server.accept
tmpSocket = LLenc.new(so)
splaydProtocol = tmpSocket.read()
$log.debug(splaydProtocol)
if splaydProtocol == "standard" then
SplaydProtocol.new(so).run
elsif splaydProtocol == "grid" then
SplaydGridProtocol.new(so).run
end
end
rescue => e
$log.error(e.class.to_s + ": " + e.to_s + "\n" + e.backtrace.join("\n"))
sleep 1
retry
end
end
end
class SplaydProtocol
class RegisterError < StandardError; end
class ProtocolError < StandardError; end
@@sleep_time = SplayControllerConfig::SPSleepTime
@@ping_interval = SplayControllerConfig::SPPingInterval
@@socket_timeout = SplayControllerConfig::SPSocketTimeout
@@logd_ip = SplayControllerConfig::LogdIP
@@logd_port = SplayControllerConfig::LogdPort
@@log_max_size = SplayControllerConfig::LogMaxSize
@@num_logd = SplayControllerConfig::NumLogd
@@localize = SplayControllerConfig::Localize
@@nat_gateway_ip = SplayControllerConfig::NATGatewayIP
@splayd = nil
@ip = nil
@so_ori = nil
@so = nil
def initialize(so)
@ip = so.peeraddr[3]
@so_ori = so
@so = LLenc.new(so)
@so.set_timeout(@@socket_timeout)
end
def run
return Thread.new do
begin
auth
main
rescue DBI::Error => e
$log.fatal(e.class.to_s + ": " + e.to_s + "\n" + e.backtrace.join("\n"))
rescue => e
# "normal" situation
$log.warn(e.class.to_s + ": " + e.to_s)
ensure
# When the thread is killed, this part is NOT threaded !
if @splayd
$log.info("Thread of splayd (#{@splayd[:key]}) will end now.")
else
$log.info("Thread of splayd (ip: #{@ip}) will end now.")
end
if @splayd
SplaydServer.threads.delete(@splayd.id)
end
begin; @so_ori.close; rescue; end
end
end
end
def refused(msg)
@so.write "REFUSED"
@so.write msg
raise RegisterError, msg
end
#
def pre_auth
end
def auth_update_lib
end
# Initialize splayd connection, authenticate, session, ...
def auth
$log.debug("A splayd (#{@ip}) try to connect.")
if @so.read != "KEY" then raise ProtocolError, "KEY" end
key = addslashes(@so.read)
session = addslashes(@so.read)
ok = true
@splayd = Splayd.new(key)
if not @splayd[:id] or @splayd[:status] == "DELETED"
refused "That splayd doesn't exist or was deleted: #{key}"
end
if @@nat_gateway_ip and @ip == @@nat_gateway_ip
if key =~ /NAT_([^_]*)_.*/ or key =~ /NAT_(.*)/
$log.info("#{@splayd}: IP change (NAT) from #{@ip} to #{$1}")
@ip = $1
else
$log.info("#{@splayd[:id]}: IP of NAT gateway without replacement.")
end
end
## This restriction is way too restrictive, and it makes impossible
## to deploy several splayds on the same phisical machine, a typical
## scenario in cluster deployments.
##if not @splayd.ip_check(@ip)
## refused "Your IP is already used by another splayd."
##end
if not @splayd.check_and_set_preavailable
refused "Your splayd is already connected. " +
"Try to kill an existing process or wait " +
"2 minutes and retry."
end
# From here if there is not an external error (socket or db problem), the
# splayd will be accepted.
old_ip = @splayd[:ip]
begin
SplaydServer.threads[@splayd[:id]] = Thread.current
# update ip if needed
if @ip != old_ip
@splayd.update(:ip=>@ip)
end
# check if we can restore the session or not
if session != @splayd[:session] or @ip != old_ip
same = false
@splayd.reset # (change session too)
else
same = true
end
# Implemented only in JobdGrid as of now
auth_update_lib()
@so.write "OK"
@so.write @splayd[:session]
if same
$log.info("#{@splayd}: Session OK")
else
@so.write "INFOS"
@so.write @ip
if @so.read != "OK" then raise ProtocolError, "INFOS not OK" end
infos = @so.read # no addslashes (json)
@splayd.insert_splayd_infos(infos)
@splayd.update_splayd_infos()
bl = Splayd.blacklist
@so.write "BLACKLIST"
@so.write bl.to_json
if @so.read != "OK" then raise ProtocolError, "BLACKLIST not OK" end
logv = {}
logv['ip'] = @@logd_ip
logv['port'] = @@logd_port + rand(@@num_logd)
logv['max_size'] = @@log_max_size
@so.write "LOG"
@so.write logv.to_json
if @so.read != "OK" then raise ProtocolError, "LOG not OK" end
$log.info("#{@splayd}: Log port: #{logv['port']}")
end
$log.info("#{@splayd}: Auth OK")
@splayd.available
rescue => e
# restore previous status (REGISTER, UNAVAILABLE or RESET)
@splayd.update("status", @splayd.row['status'])
raise e
end
if @ip != old_ip and @@localize
$log.info("#{@splayd}: Localization")
@splayd.localize
end
# TODO Invariant check @splayd.row must be == to a new fetch of infos
end
def main
begin
last_contact = @splayd.last_contact
running = true
while running
action = @splayd.next_action
if not action
if Time.now.to_i - last_contact > @@ping_interval
# "Inlining PING" Avoid 2 DB operations
@so.write "PING"
if @so.read != "OK" then raise ProtocolError, "PING not OK" end
last_contact = @splayd.last_contact
end
sleep(rand(@@sleep_time * 2 * 100).to_f / 100)
else
$log.debug("#{@splayd}: Action #{action['command']}")
start_time = Time.now.to_f
@so.write action['command']
if action['data']
if action['command'] == 'LIST' and action['position']
action['data'] = action['data'].sub(/_POSITION_/, action['position'].to_s)
end
@so.write action['data']
end
reply_code = @so.read
if reply_code == "OK"
if action['command'] == "REGISTER"
port = addslashes(@so.read)
reply_data = port
end
if action['command'] == "STATUS"
reply_data = @so.read # no addslashes (json)
end
if action['command'] == "LOADAVG"
reply_data = addslashes(@so.read)
end
if action['command'] == "HALT" or action['command'] == "KILL"
running = false
end
end
reply_time = Time.now.to_f - start_time
# We tolerate some errors because one command
# can be sent twice if there is a controller failure
# juste after the send. But REGISTER can not have an
# error because we don't re-send it, we send an
# FREE then REGISTER again to avoid that.
# All the @db.s_j_* functions are replayable.
if action['command'] == "REGISTER"
if reply_code == "OK"
# Update the job slot from RESERVED to WAITING
@splayd.s_j_register(action['job_id'])
@splayd.s_sel_reply(action['job_id'], reply_data, reply_time)
else
raise ProtocolError, "REGISTER not OK: #{reply_code}"
end
end
if action['command'] == "START"
if reply_code == "OK" or reply_code == "RUNNING"
@splayd.s_j_start(action['job_id'])
else
raise ProtocolError, "START not OK: #{reply_code}"
end
end
if action['command'] == "STOP"
if reply_code == "OK" or reply_code == "NOT_RUNNING"
@splayd.s_j_stop(action['job_id'])
else
raise ProtocolError, "STOP not OK: #{reply_code}"
end
end
if action['command'] == "FREE"
@splayd.s_j_free(action['job_id'])
end
if action['command'] == "STATUS"
@splayd.s_j_status(reply_data)
end
if action['command'] == "LOADAVG"
@splayd.parse_loadavg(reply_data)
end
# We will remove the action here so, if the
# controller crash between the reply and here, we
# will do (or redo) the proper DB things.
@splayd.remove_action(action)
last_contact = @splayd.last_contact
end
end
ensure
@splayd.unavailable
@splayd.action_failure
end
end
end
# TODO
# This class may appear in its own file or not.
# Passing the protocol as an argument in SplaydServerß could allow
# better reusability
class SplaydGridProtocol < SplaydProtocol
def auth_update_lib
#$log.info( "auth_update_lib")
in_table = @so.read # sent through json
$log.debug(in_table)
in_table = JSON.parse(in_table)
#$log.info("receive library list:")
out_table = []
#UPDATE libs that exist on the splayds
# LIST ALL SHA1
old_libs = []
# DELETE ALL AND ADD OLD BUT VALID AND THE NEW LIBS INTO THE TABLE splayd_libs
$db.do("DELETE FROM splayd_libs WHERE splayd_id='#{@splayd.row['id']}'")
in_table.each do |lib_pair|
tmp_lib = $db[:libs].where(lib_sha1=>"#{lib_pair['sha1']}") #.fetch("SELECT * FROM libs WHERE lib_sha1='#{lib_pair['sha1']}'")
if tmp_lib then
$db.do("INSERT INTO splayd_libs SET splayd_id='#{@splayd.row['id']}', lib_id='#{tmp_lib['id']}' ")
else
old_libs.push(lib_pair)
end
end
old_libs_json = JSON.unparse(old_libs)
@so.write old_libs_json
if @so.read != "OK" then raise ProtocolError, "UPDATE LIB NOT OK" end
end
def main
begin
last_contact = @splayd.last_contact
running = true
while running
action = @splayd.next_action
if not action
if Time.now.to_i - last_contact > @@ping_interval
# "Inlining PING" Avoid 2 DB operations
@so.write "PING"
if @so.read != "OK" then raise ProtocolError, "PING not OK" end
last_contact = @splayd.last_contact
end
sleep(rand(@@sleep_time * 2 * 100).to_f / 100)
else
lib_id = nil
start_time = Time.now.to_f
@so.write action['command']
if action['data']
if action['command'] == 'LIST' and action['position']
action['data'] = action['data'].sub(/_POSITION_/, action['position'].to_s)
elsif action['command'] == "REGISTER"
job = action['data']
job = JSON.parse(job)
if job['lib_name' ] && job['lib_name'] != ""
lib = $db.fetch("SELECT * FROM splayd_libs, libs WHERE splayd_libs.lib_id=libs.id AND splayd_libs.splayd_id=#{@splayd.row['id']}
AND libs.lib_name='#{job['lib_name']}' AND libs.lib_version='#{job['lib_version']}'")
if not lib #$log.debug("Send the lib to the splayd and add it in splayd_libs #{@splayd.row['architecture']} AND lib_os=#{@splayd['os']}")
lib = $db.fetch("SELECT * FROM libs WHERE lib_name='#{job['lib_name']}' AND lib_version='#{job['lib_version']}'
AND lib_arch='#{@splayd.row['architecture']}' AND lib_os='#{@splayd.row['os']}'")
job['lib_code'] = lib['lib_blob']
job['lib_sha1'] = lib['lib_sha1']
lib_id = lib['id']
end
job['lib_sha1'] = lib['lib_sha1']
job = job.to_json
action['data'] = job
end
end
@so.write action['data']
end
reply_code = @so.read
if reply_code == "OK"
if action['command'] == "REGISTER"
if lib_id != nil then $db.do("INSERT INTO splayd_libs SET splayd_id='#{@splayd.row['id']}', lib_id='#{lib_id}'") end
port = addslashes(@so.read)
reply_data = port
end
if action['command'] == "STATUS"
reply_data = @so.read # no addslashes (json)
end
if action['command'] == "LOADAVG"
reply_data = addslashes(@so.read)
end
if action['command'] == "HALT" or action['command'] == "KILL"
running = false
end
end
reply_time = Time.now.to_f - start_time
# We tolerate some errors because one command
# can be sent twice if there is a controller failure
# juste after the send. But REGISTER can not have an
# error because we don't re-send it, we send an
# FREE then REGISTER again to avoid that.
# All the @db.s_j_* functions are replayable.
if action['command'] == "REGISTER"
if reply_code == "OK"
# Update the job slot from RESERVED to WAITING
@splayd.s_j_register(action['job_id'])
@splayd.s_sel_reply(action['job_id'], reply_data, reply_time)
else
raise ProtocolError, "REGISTER not OK: #{reply_code}"
end
end
if action['command'] == "START"
if reply_code == "OK" or reply_code == "RUNNING"
@splayd.s_j_start(action['job_id'])
else
raise ProtocolError, "START not OK: #{reply_code}"
end
end
if action['command'] == "STOP"
if reply_code == "OK" or reply_code == "NOT_RUNNING"
@splayd.s_j_stop(action['job_id'])
else
raise ProtocolError, "STOP not OK: #{reply_code}"
end
end
if action['command'] == "FREE"
@splayd.s_j_free(action['job_id'])
end
if action['command'] == "STATUS"
@splayd.s_j_status(reply_data)
end
if action['command'] == "LOADAVG"
@splayd.parse_loadavg(reply_data)
end
# We will remove the action here so, if the
# controller crash between the reply and here, we
# will do (or redo) the proper DB things.
@splayd.remove_action(action)
last_contact = @splayd.last_contact
end
end
ensure
@splayd.unavailable
@splayd.action_failure
end
end
end
class Splayd
attr_accessor :row
attr_reader :id
@@transaction_mutex = Mutex.new
@@unseen_timeout = 3600
@@auto_add = SplayControllerConfig::AutoAddSplayds
def initialize(id)
@row = $db[:splayds].first(:id=>id)
if not @row
@row = $db[:splayds].first(:key=>id)
end
if not @row and @@auto_add
$db[:splayds].insert(:key=>id)
@row = $db[:splayds].where(:key=>id)
end
if @row then @id = @row.get(:id) end
$log.debug("Splayd #{id} initialized")
end
def self.init
$db[:splayds].where{status=='AVAILABLE' || status=='PREAVAILABLE'}.update(:status=>'UNAVAILABLE')
#$db.do "UPDATE splayds
# SET status='UNAVAILABLE'
# WHERE
# status='AVAILABLE' or status='PREAVAILABLE'"
Splayd.reset_actions
Splayd.reset_unseen
end
def self.reset_unseen
$db.fetch "SELECT * FROM splayds WHERE
last_contact_time<'#{Time.now.to_i - @@unseen_timeout}' AND
(status='AVAILABLE' OR
status='UNAVAILABLE' OR
status='PREAVAILABLE')" do |splayd|
$log.debug("Splayd #{splayd['id']} (#{splayd['ip']} - #{splayd['status']}) not seen " +
"since #{@@unseen_timeout} seconds (#{splayd['last_contact_time']}) => RESET")
# We kill the thread if there is one
s = Splayd.new(splayd['id'])
s.kill
s.reset
end
end
def self.reset_actions
# When the controller start, if some actions where send but still not
# replied, we will never receive the reply so we set the action to the
# FAILURE status.
#$db.do "UPDATE actions SET status='FAILURE' WHERE status='SENDING'"
$db[:actions].where{status=='SENDING'}.update(:status=>'FAILURE')
# Uncomplete actions, jobd should put the again.
$db[:actions].where(:status=>'TEMP').delete #$db.do "DELETE FROM actions WHERE status='TEMP'"
end
def self.gen_session
return OpenSSL::Digest::MD5.hexdigest(rand(1000000).to_s + "session" + rand(1000000).to_s)
end
def self.has_job(splayd_id, job_id)
sj = $db.fetch "SELECT * FROM splayd_jobs
WHERE splayd_jobs.splayd_id='#{splayd_id}' AND
splayd_jobs.job_id='#{job_id}'"
if sj then return true else return false end
end
# Send an action to a splayd only if it is active.
# For performance reasons, we will not check anymore the availability because
# 99.9% of time, when an action is sent, the splayd is available. This should
# have no consequences (other than a little DB space) because when the splayd
# comes back from a reset state, it will be reset() and the commands deleted.
def self.add_action(splayd_id, job_id, command, data = '')
$db.do "INSERT INTO actions SET
splayd_id='#{splayd_id}',
job_id='#{job_id}',
command='#{command}',
data='#{addslashes data}'"
return true
# full version follow (when not running in controller :-)
#splayd = $db.select_one "SELECT status FROM splayds WHERE id='#{splayd_id}'"
# Even UNAVAILABLE, the splayd IS active !
#if splayd['status'] == 'AVAILABLE' or splayd['status'] == 'UNAVAILABLE'
#$db.do "INSERT INTO actions SET
#splayd_id='#{splayd_id}',
#job_id='#{job_id}',
#command='#{command}',
#data='#{addslashes data}'"
#true
#else
#false
#end
end
def self.blacklist
hosts = []
$db[:blacklist_hosts].select(:host) do |row| #.select_all "SELECT host FROM blacklist_hosts"
hosts << row[0]
end
return hosts
end
def self.localize_all
return Thread.new do
$db[:splayds].select(:id) do |s| #.select_all "SELECT id FROM splayds"
splayd = Splayd.new(s['id'])
splayd.localize
end
end
end
def to_s
if @row['name'] and @row['ip']
return "#{@id} (#{@row.get(:name)}, #{@row.get(:ip)})"
elsif @row['ip']
return "#{@id} (#{@row.get(:ip)})"
else
return "#{@id}"
end
end
def check_and_set_preavailable
r = false
# to protect the $dbt object while in use.
@@transaction_mutex.synchronize do
$dbt.transaction do
status= $dbt[:splayds].where(:id=>@id).get(:status)
if status == 'REGISTERED' or status == 'UNAVAILABLE' or status == 'RESET' then
$dbt.do "UPDATE splayds SET
status='PREAVAILABLE'
WHERE id ='#{@id}'"
r = true
end
end # COMMIT issued only here
end
return r
end
# Check that this IP is not used by another splayd.
def ip_check ip
if ip == "127.0.0.1" or ip=="::ffff:127.0.0.1" or not $db.fetch "SELECT * FROM splayds WHERE
ip='#{ip}' AND
`key`!='#{@row.get(:key)}' AND
(status='AVAILABLE' OR status='UNAVAILABLE' OR status='PREAVAILABLE')"
true
else
false
end
end
def insert_splayd_infos infos
infos = JSON.parse infos
if infos['status']['endianness'] == 0
infos['status']['endianness'] = "little"
else
infos['status']['endianness'] = "big"
end
# We don't update ip, key, session and localization infomrations here
$db.do "UPDATE splayds SET
name='#{addslashes(infos['settings']['name'])}',
version='#{addslashes(infos['status']['version'])}',
protocol='#{addslashes(infos['settings']['protocol'])}',
lua_version='#{addslashes(infos['status']['lua_version'])}',
bits='#{addslashes(infos['status']['bits'])}',
endianness='#{addslashes(infos['status']['endianness'])}',
os='#{addslashes(infos['status']['os'])}',
full_os='#{addslashes(infos['status']['full_os'])}',
architecture='#{addslashes(infos['status']['architecture'])}',
start_time='#{addslashes((Time.now.to_f - infos['status']['uptime'].to_f).to_i)}',
max_number='#{addslashes(infos['settings']['job']['max_number'])}',
max_mem='#{addslashes(infos['settings']['job']['max_mem'])}',
disk_max_size='#{addslashes(infos['settings']['job']['disk']['max_size'])}',
disk_max_files='#{addslashes(infos['settings']['job']['disk']['max_files'])}',
disk_max_file_descriptors='#{addslashes(infos['settings']['job']['disk']['max_file_descriptors'])}',
network_max_send='#{addslashes(infos['settings']['job']['network']['max_send'])}',
network_max_receive='#{addslashes(infos['settings']['job']['network']['max_receive'])}',
network_max_sockets='#{addslashes(infos['settings']['job']['network']['max_sockets'])}',
network_max_ports='#{addslashes(infos['settings']['job']['network']['max_ports'])}',
network_send_speed='#{addslashes(infos['settings']['network']['send_speed'])}',
network_receive_speed='#{addslashes(infos['settings']['network']['receive_speed'])}'
WHERE id='#{@id}'"
parse_loadavg(infos['status']['loadavg'])
end
def update_splayd_infos
@row = $db[:splayds].first(:id=>@id)
end
def localize
if @row.get(:ip) and
not @row.get(:ip) == "127.0.0.1" and
not @row.get(:ip) =~ /192\.168\..*/ and
not @row.get(:ip) =~ /10\.0\..*/
$log.debug("Trying to localize: #{@row.get(:ip)}")
begin
hostname = ""
begin
Timeout::timeout(10, StandardError) do
hostname = Resolv::getname(@row['ip'])
end
rescue
$log.warn("Timeout resolving hostname of IP: #{@row['ip']}")
end
loc = Localization.get(@row['ip'])
$log.info("#{@id} #{@row['ip']} #{hostname} " +
"#{loc.country_code2.downcase} #{loc.city_name}")
$db.do "UPDATE splayds SET
hostname='#{hostname}',
country='#{loc.country_code2.downcase}',
city='#{loc.city_name}',
latitude='#{loc.latitude}',
longitude='#{loc.longitude}'
WHERE id='#{@id}'"
rescue => e
puts e
$log.error("Impossible localization of #{@row['ip']}")
end
end
end
def remove_action action
$db.do "DELETE FROM actions WHERE id='#{action['id']}'"
end
def update(field, value)
$db.do "UPDATE splayds SET #{field}='#{value}' WHERE id='#{@id}'"
@row[field] = value
end
def kill
if SplaydServer.threads[@id]
SplaydServer.threads.delete(@id).kill
end
end
# DB cleaning when a splayd is reset.
def reset
@row['session'] = Splayd.gen_session
$db.do "UPDATE splayds SET
status='RESET', session='#{@row['session']}' WHERE id='#{@id}'"
$db.do "DELETE FROM actions WHERE splayd_id='#{@id}'"
$db.do "DELETE FROM splayd_jobs WHERE splayd_id='#{@id}'"
$db.do "INSERT INTO splayd_availabilities SET
splayd_id='#{@id}', status='RESET', time='#{Time.now.to_i}'"
# for trace job
$db.do "UPDATE splayd_selections SET reset='TRUE' WHERE splayd_id='#{@id}'"
end
def unavailable
$db.do "UPDATE splayds SET status='UNAVAILABLE' WHERE id='#{@id}'"
$db.do "INSERT INTO splayd_availabilities SET
splayd_id='#{@id}',
status='UNAVAILABLE',
time='#{Time.now.to_i}'"
end
def action_failure
$db.do "UPDATE actions SET status='FAILURE'
WHERE status='SENDING' AND splayd_id='#{@id}'"
end
def available
$db.do "UPDATE splayds SET status='AVAILABLE' WHERE id='#{@id}'"
$db.do "INSERT INTO splayd_availabilities SET
splayd_id='#{@id}',
ip='#{@row['ip']}',
status='AVAILABLE',
time='#{Time.now.to_i}'"
last_contact
restore_actions
end
def last_contact
$db.do "UPDATE splayds SET
last_contact_time='#{Time.now.to_i}' WHERE id='#{@id}'"
return Time.now.to_i
end
# Restore actions in failure state.
def restore_actions
$db.select_all "SELECT * FROM actions WHERE
status='FAILURE' AND
splayd_id='#{@id}'" do |action|
if action['command'] == 'REGISTER'
# We should put the FREE-REGISTER at the same place
# where REGISTER was. But, no other register action concerning
# this splayd and this job can exists (because registering is
# split into states), so, if we remove the REGISTER, we can safely
# add the FREE-REGISTER commands at the top of the
# actions.
job = $db.select_one "SELECT ref FROM jobs WHERE id='#{action['job_id']}'"
$db.do "DELETE FROM actions WHERE id='#{action['id']}'"
Splayd.add_action(action['splayd_id'], action['job_id'], 'FREE', job['ref'])
Splayd.add_action(action['splayd_id'], action['job_id'], 'REGISTER', addslashes(job['code']))
else
$db.do "UPDATE actions SET status='WAITING' WHERE id='#{action['id']}'"
end
end
end
# Return the next WAITING action and set status to SENDING.
def next_action
action = $db.select_one "SELECT * FROM actions WHERE
splayd_id='#{@id}' ORDER BY id LIMIT 1"
$log.debug("next action to do: #{action}")
if action
if action['status'] == 'TEMP'
$log.info("INCOMPLETE ACTION: #{action['command']} " +
"(splayd: #{@id}, job: #{action['job_id']})")
end
if action['status'] == 'WAITING'
$db.do "UPDATE actions SET
status='SENDING'
WHERE id='#{action['id']}'"
return action
end
end
nil
end
def s_j_register job_id
$db.do "UPDATE splayd_jobs SET
status='WAITING'
WHERE
splayd_id='#{@id}' AND
job_id='#{job_id}' AND
status='RESERVED'"
end
def s_j_free job_id
$db.do "DELETE FROM splayd_jobs WHERE
splayd_id='#{@id}' AND
job_id='#{job_id}'"
end
def s_j_start job_id
$db.do "UPDATE splayd_jobs SET
status='RUNNING'
WHERE
splayd_id='#{@id}' AND
job_id='#{job_id}'"
end
def s_j_stop job_id
$db.do "UPDATE splayd_jobs SET
status='WAITING'
WHERE
splayd_id='#{@id}' AND
job_id='#{job_id}'"
end
def s_j_status data
data = JSON.parse data
$db.select_all "SELECT * FROM splayd_jobs WHERE
splayd_id='#{@id}' AND
status!='RESERVED'" do |sj|
job = $db.select_one "SELECT ref FROM jobs WHERE id='#{sj['job_id']}'"
# There is no difference in Lua between Hash and Array, so when it's
# empty (an Hash), we encoded it like an empy Array.
if data['jobs'].class == Hash and data['jobs'][job['ref']]
if data['jobs'][job['ref']]['status'] == "waiting"
$db.do "UPDATE splayd_jobs SET status='WAITING'
WHERE id='#{sj['id']}'"
end
# NOTE normally no needed because already set to RUNNING when
# we send the START command.
if data['jobs'][job['ref']]['status'] == "running"
$db.do "UPDATE splayd_jobs SET status='RUNNING'
WHERE id='#{sj['id']}'"
end
else
$db.do "DELETE FROM splayd_jobs WHERE id='#{sj['id']}'"
end
# it can't be new jobs in data['jobs'] that don't have already an
# entry in splayd_jobs
end
end
def parse_loadavg s
if s.strip != ""
l = s.split(" ")
$db.do "UPDATE splayds SET
load_1='#{l[0]}',
load_5='#{l[1]}',
load_15='#{l[2]}'
WHERE id='#{@id}'"
else
# NOTE should too be fixed in splayd
$log.warn("Splayd #{@id} report an empty loadavg. ")
$db.do "UPDATE splayds SET
load_1='10',
load_5='10',
load_15='10'
WHERE id='#{@id}'"
end
end
# NOTE then corresponding entry may already have been deleted if the reply
# comes after the job has finished his registration, but no problem.
def s_sel_reply(job_id, port, reply_time)
$db.do "UPDATE splayd_selections SET
replied='TRUE',
reply_time='#{reply_time}',
port='#{port}'
WHERE splayd_id='#{@id}' AND job_id='#{job_id}'"
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment