class Farmstead::Service
Public Class Methods
new()
click to toggle source
# File lib/farmstead/service.rb, line 4 def initialize read_environment #mysql_init @kafka = Kafka.new( seed_brokers: ["#{@broker_host}:9092"], client_id: @service_name ) @producer = @kafka.producer @consumer = @kafka.consumer(group_id: @service_name) @logfile = "/tmp/farmlog" end
Public Instance Methods
consumer()
click to toggle source
Subscribes to a Topic Works on the message
# File lib/farmstead/service.rb, line 36 def consumer loop do puts 'Consuming' end end
get_from_json(json, element)
click to toggle source
Gets the value of an element from json
# File lib/farmstead/service.rb, line 53 def get_from_json(json, element) hash = JSON.parse(json) hash[element] end
magic_work(body)
click to toggle source
# File lib/farmstead/service.rb, line 63 def magic_work(body) hash = JSON.parse(body) hash['scarecrow'] = 'true' json = hash.to_json puts "Writing: #{json}" write_message(json, topic: 'Forest') end
print_time()
click to toggle source
# File lib/farmstead/service.rb, line 58 def print_time time1 = Time.new write_file(@logfile, "Current Time : #{time1.inspect}") end
producer()
click to toggle source
Runs on an infinite loop processing records on MySQL DB
and writing messages accordingly
# File lib/farmstead/service.rb, line 28 def producer loop do puts 'Producing' end end
read_environment()
click to toggle source
# File lib/farmstead/service.rb, line 16 def read_environment @broker_host = ENV['KAFKA_ADVERTISED_HOST_NAME'] @mysql_host = ENV['MYSQL_HOST'] @mysql_pass = ENV['MYSQL_PASSWORD'] @mysql_user = ENV['MYSQL_USER'] @mysql_db = ENV['MYSQL_DATABASE'] @service_name = ENV['SERVICE'] #@selenium_hub = ENV['SELENIUM_HUB'] end
write_file(filename, text)
click to toggle source
Appends to existing file
# File lib/farmstead/service.rb, line 48 def write_file(filename, text) File.open(filename, 'a') { |file| file.write("#{text}\n") } end
write_message(message, topic)
click to toggle source
# File lib/farmstead/service.rb, line 42 def write_message(message, topic) @producer.produce(message, topic) @producer.deliver_messages end