Skip to content

Instantly share code, notes, and snippets.

@agrare
Created May 18, 2017 14:16
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save agrare/67022d996d0a9f7103c429cb5fc78357 to your computer and use it in GitHub Desktop.
Save agrare/67022d996d0a9f7103c429cb5fc78357 to your computer and use it in GitHub Desktop.
require 'kafka'
$producer = Kafka.new(seed_brokers: ['localhost:9092']).producer
def send_or_update(ems, persister, count, batch_size)
if count == :rest || count > batch_size
inventory_yaml = persister.to_yaml
puts "sending inventory to kafka, #{inventory_yaml.size} bytes"
$producer.produce(inventory_yaml, topic: 'inventory')
$producer.deliver_messages
# And and create new persistor so the old one with data can be GCed
return_persister = ManageIQ::Providers::Amazon::Inventory::Persister::StreamedData.new(
ems, ems
)
return_count = 1
else
return_persister = persister
return_count = count + 1
end
return return_persister, return_count
end
def process_entity(ems, entity_name, starting_persister, starting_count, total_elements, batch_size)
persister = starting_persister
count = starting_count
(1..total_elements).each do |index|
send("parse_#{entity_name.to_s}", index, persister)
persister, count = send_or_update(ems, persister, count, batch_size)
end
return persister, count
end
def parse_orchestration_stack(index, persister)
parent = index > 2 ? persister.orchestration_stacks.lazy_find("stack_#{index - 1}") : nil
persister.orchestration_stacks.build(
:ems_ref => "stack_#{index}",
:name => "stack_#{index}_name",
:description => "stack_#{index}_description",
:status => "stack_#{index}_ok",
:status_reason => "stack_#{index}_status_reason",
:parent => parent
)
end
def parse_vm(index, persister)
persister.vms.build(
:ems_ref => "instance_#{index}",
:uid_ems => "instance_#{index}_uid_ems",
:name => "instance_#{index}_name",
:vendor => "amazon",
:raw_power_state => "instance_#{index} status",
:boot_time => Time.now.utc, # this will cause that dta are updated in second + refresh
:availability_zone => persister.availability_zones.lazy_find("az_#{index}"),
:flavor => persister.flavors.lazy_find("flavor_#{index}"),
:genealogy_parent => persister.miq_templates.lazy_find("image_#{index}"),
# :key_pairs => [persister.key_pairs.lazy_find("key_pair_#{index}")],
:location => persister.networks.lazy_find("image_#{index}__public", :key => :hostname, :default => 'unknown'),
:orchestration_stack => persister.orchestration_stacks.lazy_find("stack_#{index}")
)
end
def parse_image(index, persister)
persister.miq_templates.build(
:ems_ref => "image_#{index}",
:uid_ems => "image_#{index}_uid_ems",
:name => "image_#{index}_name",
:location => "image_#{index}_location",
:vendor => "amazon",
:raw_power_state => "never",
:template => true,
:publicly_available => true
)
end
def parse_flavor(index, persister)
persister.flavors.build(
:ems_ref => "flavor_#{index}",
:name => "flavor_#{index}_name",
:description => "flavor_#{index}_description",
:enabled => true,
:cpus => 1,
:cpu_cores => 1,
:memory => 1024,
:supports_32_bit => true,
:supports_64_bit => true,
:supports_hvm => true,
:supports_paravirtual => false,
:block_storage_based_only => true,
:cloud_subnet_required => true,
:ephemeral_disk_size => 10,
:ephemeral_disk_count => 1
)
end
def parse_availability_zone(index, persister)
persister.availability_zones.build(
:ems_ref => "az_#{index}",
:name => "az_#{index}_name"
)
end
def parse_hardware(index, persister)
persister.hardwares.build(
:vm_or_template => persister.vms.lazy_find("instance_#{index}"),
:bitness => 64,
:virtualization_type => "hvm",
:root_device_type => "root_device_type",
:cpu_sockets => 4,
:cpu_cores_per_socket => 1,
:cpu_total_cores => 6,
:memory_mb => 600,
:disk_capacity => 200,
:guest_os => persister.hardwares.lazy_find("image_#{index}", :key => :guest_os),
)
end
def parse_key_pair(index, persister)
persister.key_pairs.build(
:name => "key_pair_#{index}",
:fingerprint => "key_pair_#{index}_fingerprint"
)
end
def generate_batches_od_data(ems_name:, total_elements:, batch_size: 10)
ems = ExtManagementSystem.find_by(:name => ems_name)
persister = ManageIQ::Providers::Amazon::Inventory::Persister::StreamedData.new(
ems, ems
)
count = 1
persister, count = process_entity(ems, :vm, persister, count, total_elements, batch_size)
persister, count = process_entity(ems, :image, persister, count, total_elements, batch_size)
persister, count = process_entity(ems, :hardware, persister, count, total_elements, batch_size)
persister, count = process_entity(ems, :availability_zone, persister, count, total_elements, batch_size)
persister, count = process_entity(ems, :flavor, persister, count, total_elements, batch_size)
persister, count = process_entity(ems, :orchestration_stack, persister, count, total_elements, batch_size)
persister, count = process_entity(ems, :key_pair, persister, count, total_elements, batch_size)
# Send or update the rest which is batch smaller than the batch size
send_or_update(ems, persister, :rest, batch_size)
end
generate_batches_od_data(:ems_name => "my_ems", :total_elements => 1234)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment