class Object
Constants
- MAX_WS_FRAME_SIZE
Key Variables
Public Instance Methods
Sends back the HTTP header to initialize the websocket connection.
# File lib/easel/websocket.rb, line 196 def accept_connection(socket, ws_key) ws_accept_key = Digest::SHA1.base64digest( ws_key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11") socket.write "HTTP/1.1 101 Switching Protocols\r\n" + "Upgrade: websocket\r\n" + "Connection: Upgrade\r\n" + "Sec-WebSocket-Accept: #{ws_accept_key}\r\n" + "\r\n" end
# File lib/easel/build_pages.rb, line 21 def build_app app_erb = File.new("#{File.dirname(__FILE__)}/../html/app.html.erb").read page = ERB.new(app_erb).result() "HTTP/1.1 200 OK\r\n" + "Content-Type: text/html; charset=UTF-8\r\n" + "Content-Length: #{page.bytesize}\r\n" + "Connection: close\r\n" + "\r\n" + page end
# File lib/easel/build_pages.rb, line 82 def build_css error_erb = File.new("#{File.dirname(__FILE__)}/../html/app.css.erb").read css = ERB.new(error_erb).result(binding) "HTTP/1.1 200 OK\r\n" + "Content-Type: text/css; charset=UTF-8\r\n" + "Content-Length: #{css.bytesize}\r\n" + "Connection: close\r\n" + "\r\n" + css end
# File lib/easel/build_pages.rb, line 98 def build_error code error_erb = File.new("#{File.dirname(__FILE__)}/../html/error.html.erb").read page = ERB.new(error_erb).result(binding) "HTTP/1.1 #{code} #{@code_names[code]}\r\n" + "Content-Type: text/html; charset=UTF-8\r\n" + "Content-Length: #{page.bytesize}\r\n" + "Connection: close\r\n" + "\r\n" + page end
# File lib/easel/build_pages.rb, line 36 def build_js js_erb = File.new("#{File.dirname(__FILE__)}/../html/controller.js.erb").read page = ERB.new(js_erb).result() "HTTP/1.1 200 OK\r\n" + "Content-Type: text/javascript; charset=UTF-8\r\n" + "Content-Length: #{page.bytesize}\r\n" + "Connection: close\r\n" + "\r\n" + page end
Collects information on the current state of the system, and writes it to @collected_data.
# File lib/easel/data_gathering.rb, line 49 def collect_data new_data = {} # TODO: Check which element type something is to figure out how to handle it. log_info "Collecting data." $config[:dashboards].each{ |dashboard| new_data[dashboard[:id]] = {} dashboard[:elements].each_with_index{ |element, e_index| new_data[dashboard[:id]][e_index] = {} element[:data].each_with_index{ |data, index| output = `#{data[:cmd]}` log_info "Ran `#{data[:cmd]}`, got: #{output}" begin value = output.match(/#{data[:regex]}/)[1] rescue NoMethodError => e log_error "Data failed to be parsed. Regex: /#{data[:regex]}/ -- Output: #{output}" end new_data[dashboard[:id]][e_index][index] = [ Time.new.strftime("%H:%M:%S"), value ] } } } write_data new_data end
# File lib/easel/websocket.rb, line 184 def get_command cmd_id $config[:commands].each { |cmd| if cmd[:id] == cmd_id return cmd[:cmd] end } nil end
Handle a get request.
# File lib/easel/server.rb, line 93 def handle_get(socket, request) case request[:url] when "/", "/index.html" socket.print build_app socket.close unless request[:Connection].is_a? String and request[:Connection].downcase == "keep-alive\r\n" when "/app.css" socket.print build_css socket.close unless request[:Connection].is_a? String and request[:Connection].downcase == "keep-alive\r\n" when "/controller.js" log_info "building controller" socket.print build_js socket.close unless request[:Connection].is_a? String and request[:Connection].downcase == "keep-alive\r\n" when "/dashboardElements.js" socket.print return_js 'dashboardElements.js' socket.close unless request[:Connection].is_a? String and request[:Connection].downcase == "keep-alive\r\n" when "/createComponents.js" socket.print return_js 'createComponents.js' socket.close unless request[:Connection].is_a? String and request[:Connection].downcase == "keep-alive\r\n" # TODO: respond with favicon else socket.print build_error 404 socket.close unless request[:Connection].is_a? String and request[:Connection].downcase == "keep-alive\r\n" end log_info "Handled HTTP request: #{request}" end
# File lib/easel/server.rb, line 45 def handle_request socket log_info "Receieved request: #{socket}" request = read_HTTP_message socket if request.nil? socket.print build_error 400 socket.close return end case request[:method] when "GET" if request[:fields][:Upgrade] == "websocket\r\n" run_websocket(socket, request) else handle_get(socket, request) end when "HEAD" if request[:fields][:Upgrade] == "websocket\r\n" else read_end, write_end = IO.pipe handle_get(write_end, request) msg = "" loop do msg += read_end.readline if msg.include? "\r\n\r\n" socket.print msg socket.close break elsif read_end.readline.nil? socket.print build_error 500 socket.close break end end end else puts "THIS SHOULD NOT OCCUREEEEEE" socket.print build_error 400 socket.close end end
launch
Launches the Easel Dashboard. Check the $config variable for defaults, although everything can be overridden by the YAML file (and some by the command line arguments).
# File lib/easel.rb, line 22 def launch parse_ARGV # Load the provided YAML overwrite_config $config[:yaml_file] log_info("YAML loaded successfully (from: #{$config[:yaml_file]})") $config[:commands].each_with_index{ |cmd, i| cmd[:id] = i } # Give commands an ID. $config.freeze # Set config to read only # Lauch the server log_info("Launching server at #{$config[:hostname]}:#{$config[:port]}") launch_server end
Launch a background thread to start collecting system info in the background.
# File lib/easel/data_gathering.rb, line 35 def launch_data_collection Thread.new do loop do collect_data sleep $config[:collect_data_period] end end end
# File lib/easel/controller.rb, line 9 def launch_easel config # r_server_data = Ractor.new config do # 'this is a message' # TODO: Implement this. # end tcp_listener = Ractor.new(config) do |config| launch_server config end tcp_listener.take end
# File lib/easel/server.rb, line 15 def launch_server # Lauch the TCPServer begin server = TCPServer.new($config[:hostname], $config[:port]) rescue Exception => e log_fatal "Server could not start. Error message: #{e}" end # Lauch data collection if turned on. launch_data_collection unless $config[:collect_data_period] == 0 Thread.abort_on_exception = true # Main Loop begin loop { Thread.start(server.accept) do |client| handle_request client end } rescue Interrupt # Handle shutting down. log_info "Interrupt received, server shutting down..." rescue Exception => e log_error "Unexpected error occured and closed client connection. Error: #{e}" e.backtrace.each { |trace| log_error "#{trace}" } end end
# File lib/easel/logging.rb, line 19 def log_error *msg unless $config[:logging] < 1 $config[:log_file].puts "[#{Time.new.strftime("%Y-%m-%d-%H:%M:%S")}] ERROR: " + msg.join(" ") end end
# File lib/easel/logging.rb, line 12 def log_fatal *msg unless $config[:logging] == 0 $config[:log_file].puts "[#{Time.new.strftime("%Y-%m-%d-%H:%M:%S")}] FATAL: " + msg.join(" ") end exit 1 end
# File lib/easel/logging.rb, line 31 def log_info *msg unless $config[:logging] < 3 $config[:log_file].puts "[#{Time.new.strftime("%Y-%m-%d-%H:%M:%S")}] INFO: " + msg.join(" ") end end
# File lib/easel/logging.rb, line 25 def log_warning *msg unless $config[:logging] < 2 $config[:log_file].puts "[#{Time.new.strftime("%Y-%m-%d-%H:%M:%S")}] WARNING: " + msg.join(" ") end end
# File lib/easel.rb, line 108 def loop_overwrite (config, yaml) yaml.each_key { |key| if yaml[key].is_a? Hash loop_overwrite(config[key.to_sym], yaml[key]) elsif yaml[key].is_a? Array config[key.to_sym] = [] yaml[key].each { |elmnt| element = {} loop_overwrite(element, elmnt) config[key.to_sym] << element } else config[key.to_sym] = yaml[key] end } end
Overwrites the $config fields that are set in the YAML file provided on input. TODO: Log (error?) every key in the YAML file that does not exist in $config.
# File lib/easel.rb, line 93 def overwrite_config yaml_filename # TODO: Rewrite using pattern matching to allow checking if the # yaml_contents.each is one of the base keys. If so, check that the associated # value matches the expected nesting. Do that 'no other values' check. # TODO: Ensure that the command names are less than 1020 bytes (because I'm # setting the max length of a single websocket message to 1024 (minus 'STOP:')) begin yaml_contents = YAML.load_file $config[:yaml_file] rescue Exception => e log_fatal "YAML failed to load. Error Message: #{e}" end def loop_overwrite (config, yaml) yaml.each_key { |key| if yaml[key].is_a? Hash loop_overwrite(config[key.to_sym], yaml[key]) elsif yaml[key].is_a? Array config[key.to_sym] = [] yaml[key].each { |elmnt| element = {} loop_overwrite(element, elmnt) config[key.to_sym] << element } else config[key.to_sym] = yaml[key] end } end loop_overwrite($config, yaml_contents) end
Parses the command line arguments (ARGV) using the optparse gem. Optional command line arguments can be seen by running this program with the -h (or –help) flag.
# File lib/easel.rb, line 43 def parse_ARGV opt_parser = OptionParser.new do |opts| opts.banner = "Useage: launch.rb [flags] configuration.yaml" opts.on("-h", "--help", "Prints this help message.") do puts opts exit end opts.on("-l LOG_LEVEL", "--log LOG_LEVEL", Integer, "Sets the logging level (default=2). 0=Silent, 1=Errors, 2=Warnings, 3=Info.") do |lvl| if [0, 1, 2, 3].include?(lvl) $config[:logging] = lvl else log_fatal "Command argument LOG_LEVEL '#{lvl}' not recognized. Expected 0, 1, 2, or 3." end end opts.on("-p PORT", "--port PORT", Integer, "Sets the port to bind to. Default is #{$config[:port]}.") do |port| if port >= 0 and port <= 65535 $config[:port] = port else log_fatal "Command argument PORT '#{port}' not a valid port. Must be between 0 and 65535 (inclusive)" end end opts.on("-h HOST", "--hostname HOST", "Sets the hostname. Default is '#{$config[:hostname]}'.") do |host| $config[:host] = host end opts.on("-o [FILE]", "--output [FILE]", "Set a log file.") do |filename| begin $config[:log_file] = File.new(filename, "a") rescue Exception => e log_error "Log file could not be open. Sending log to STDIN. Error message: #{e}" end end end.parse! if ARGV.length != 1 log_fatal "launch.rb takes exactly one file. Try -h for more details." else $config[:yaml_file] = ARGV[0] end end
Read an HTTP message from the socket, and parse it into a request Hash.
# File lib/easel/server.rb, line 126 def read_HTTP_message socket message = [] first_line = true loop do line = socket.gets if first_line return nil if line.nil? or not line.match(/^(GET|HEAD|POST|PUT|DELETE|OPTIONS|TRACE) .+ HTTP.+/) first_line = false end message << line if line == "\r\n" break end end request = {fields: {}} (request[:method], request[:url], request[:protocol]) = message[0].split(" ") message[1..-1].each{ |line| (key, value) = line.split(": ") request[:fields][key.split("-").join("_").to_sym] = value } request end
Reads (copies) @collected_data, and handles the Readers/Writer problem by using semaphores and a mutex. Returns a copy
# File lib/easel/data_gathering.rb, line 125 def read_data joined = false until joined @join_mutex.synchronize { if @writers_semaphore.available_permits == 0 @readers_semaphore.release 1 # Increment @readers_semaphore # TODO: likely need to release the mutex. joined = true end } sleep 0.05 unless joined # Wait 50ms to give another thread time to lock the @join_mutex. end # Read Data data = @collected_data.dup @readers_semaphore.acquire 1 # Decrement @readers_semaphore data end
Extremely naive websocket server. Requires masking, and a message with a length of at most 125. Needs to be updated to conform with RFC 6544
# File lib/easel/websocket.rb, line 212 def receive_msg socket # Check first two bytes byte1 = socket.getbyte byte2 = socket.getbyte return nil if byte1.nil? or byte2.nil? if byte1 == 0x88 # Client is requesting that we close the connection. # TODO: Unsure how to properly handle this case. Right now the socket will close and # everything here will shut down - eventually? Kill all child threads first? log_info "Client requested the websocket be closed." socket.close return end fin = byte1 & 0b10000000 opcode = byte1 & 0b00001111 msg_size = byte2 & 0b01111111 is_masked = byte2 & 0b10000000 unless fin and opcode == 1 and is_masked and msg_size < MAX_WS_FRAME_SIZE log_error "Invalid websocket message received. #{fin}, #{opcode == 1}, #{is_masked}, #{msg_size}" msg_size.times { socket.getbyte } # Read message from socket. return end # Get message mask = 4.times.map { socket.getbyte } raise "Not Integer: fin > #{fin }" if not fin.is_a? Integer raise "Not Integer: msg_size > #{msg_size }" if not msg_size.is_a? Integer raise "Not Integer: opcode > #{opcode }" if not opcode.is_a? Integer raise "Not Integer: is_masked > #{is_masked}" if not is_masked.is_a? Integer raise "Not aRRAY: mask > #{mask }" if not mask.is_a? Array msg = msg_size.times.map { socket.getbyte }.each_with_index.map { |byte, i| byte ^ mask[i % 4] }.pack('C*').force_encoding('utf-8').inspect log_info "WebSocket received: #{msg}" msg[1..-2] # Remove quotation marks from message end
# File lib/easel/build_pages.rb, line 67 def return_html file page = File.new("#{File.dirname(__FILE__)}/../html/#{file}").read "HTTP/1.1 200 OK\r\n" + "Content-Type: text/html; charset=UTF-8\r\n" + "Content-Length: #{page.bytesize}\r\n" + "Connection: close\r\n" + "\r\n" + page end
# File lib/easel/build_pages.rb, line 52 def return_js file page = File.new("#{File.dirname(__FILE__)}/../html/#{file}").read "HTTP/1.1 200 OK\r\n" + "Content-Type: text/javascript; charset=UTF-8\r\n" + "Content-Length: #{page.bytesize}\r\n" + "Connection: close\r\n" + "\r\n" + page end
Run a command and stream the stdout and stderr through the websocket.
# File lib/easel/websocket.rb, line 145 def run_command_and_stream(socket, cmd_id, send_msg_mutex) cmd = get_command cmd_id if cmd.nil? log_error "Client requested command ID #{cmd_id} be run, but that ID does not exist." return end Open3::popen3(cmd) do |stdin, stdout, stderr, cmd_thread| continue = true while ready_fds = IO.select([stdout, stderr])[0] ready_fds.each{ |fd| resp = fd.gets if resp.nil? continue = false break end if fd == stdout send_msg(socket, send_msg_mutex, cmd_id, "OUT", resp) elsif fd == stderr send_msg(socket, send_msg_mutex, cmd_id, "ERR", resp) else raise "Received output from popen3(#{cmd}) that was not via stdout or stderr." end } break unless continue end cmd_thread.join send_msg(socket, send_msg_mutex, cmd_id, "FINISHED") end end
# File lib/easel/websocket.rb, line 78 def run_websocket(socket, initial_request) accept_connection(socket, initial_request[:fields][:Sec_WebSocket_Key][0..-3]) log_info "Accepted WebSocket Connection" child_threads = {} send_msg_mutex = Mutex.new # One mutex per websocket to control sending messages. Thread.new { # Periodically update the generic dashboard if set. loop do #begin data = read_data send_msg(socket, send_msg_mutex, nil, "DASH", data) #rescue Errno::EPIPE # log_info "Pipe closed erorr while sending periodic data to client" # break #end sleep $config[:collect_data_period] end } unless $config[:collect_data_period] == 0 begin loop { begin msg = receive_msg socket rescue Errno::ECONNRESET => e log_error "Client reset the connection" socket.close msg = nil end break if msg.nil? # The socket was closed by the client. case msg.split(":")[1] when "RUN" cmd_id = msg.match(/^RUN:(.*)$/)[1].to_i unless child_threads[cmd_id] child_threads[cmd_id] = Thread.new do run_command_and_stream(socket, cmd_id, send_msg_mutex) child_threads[cmd_id] = nil end end when "STOP" cmd_id = msg.match(/^STOP:(.*)$/)[1].to_i unless child_threads[cmd_id].nil? child_threads[cmd_id].kill child_threads[cmd_id] = nil end else log_error "Received an unrecognized message over the websocket: #{msg}" end } rescue Exception => e log_error "Wuh Woh #2: #{e}" e.backtrace.each { |trace| log_error "#{trace}" } raise e end end
Sends a message over the websocket. Requires that the message be an appropriate length, and have the right format (eg. checks should be done before calling this function). TODO: Figure out the proper frame size (MAX_WS_FRAME_SIZE
).
# File lib/easel/websocket.rb, line 339 def send_frame(socket, msg) output = [0b10000001, msg.size, msg] begin socket.write output.pack("CCA#{msg.size}") rescue IOError, Errno::EPIPE log_error "WebSocket is closed. Msg: #{msg}" end end
# File lib/easel/websocket.rb, line 258 def send_msg(socket, send_msg_mutex, cmd_id, msg_type, msg=nil) case msg_type when "OUT", "ERR" # See comments at the top of the file to explain this part of the protocol. header = "#{cmd_id}:#{msg_type}:" if header.length > MAX_WS_FRAME_SIZE log_error "Message header '#{msg_type}' is too long. Msg: #{msg}." elsif msg.nil? log_error "Message of type '#{msg_type}' sent without a message." else send_msg_mutex.synchronize { if msg.length > MAX_WS_FRAME_SIZE - header.length msg_part_len = MAX_WS_FRAME_SIZE - header.length msg_parts = (0..(msg.length-1)/msg_part_len).map{ |i| msg[i*msg_part_len,msg_part_len] } msg_parts.each{ |part| send_frame(socket, header + part) } else send_frame(socket, header + msg) end } end when "DASH" # See comments at the top of the file to explain this part of the protocol. if msg.nil? log_error "Message of type '#{msg_type}' sent without a message." end msg.each_key { |dash_id| msg[dash_id].each_key { |ele_index| did = "#{dash_id}#{ele_index}" send_msg_mutex.synchronize { msg[dash_id][ele_index].each_key { |key| data_fragment = "#{key}->#{msg[dash_id][ele_index][key]}" if data_fragment.length > MAX_WS_FRAME_SIZE - (did.length + 3) msg_part_len = MAX_WS_FRAME_SIZE - (did.length + 7) # TODO: Handle case where header is longer than DID:XX/XX: msg_parts = (0..(data_fragment.length-1)/msg_part_len).map{ |i| data_fragment[i*msg_part_len,msg_part_len] } msg_parts.each_with_index{ |part, index| header = did + ":#{index + 1}/#{msg_parts.length}:" if header.length > MAX_WS_FRAME_SIZE log_error "Message header '#{msg_type}' is too long. Data: #{data_fragment}." end send_frame(socket, header + part) } else send_frame(socket, did + ":A:" + data_fragment) end } } } } when "CLEAR", "FINISHED" if !msg.nil? log_error "Message of type '#{msg_type}' passed an empty message. Msg: #{msg}." end to_send = "#{cmd_id}:#{msg_type}" send_msg_mutex.synchronize { if to_send.length > MAX_WS_FRAME_SIZE log_error "Message of type '#{msg_type}' is too long. Msg: #{to_send}." else send_frame(socket, to_send) end } else log_error "Trying to send a websocket message with unrecognized type: #{msg_type}" end log_info "Message sent via WebSocket: #{msg}" end
Writes the data collected to @collected_data, and handles the Readers/Writer problem by using semaphores and a mutex.
# File lib/easel/data_gathering.rb, line 84 def write_data data joined = false until joined @join_mutex.synchronize { if @readers_semaphore.available_permits == 0 and @writers_semaphore.available_permits == 0 @writers_semaphore.release 1 # Increment @writers_semaphore joined = true end } sleep 0.05 # Wait 50ms to give another thread time to lock the @join_mutex. end # Write Data data.each_key { |key| case key when :load @collected_data[:load] = [] if @collected_data[:load].nil? @collected_data[:load] << data[:load] else @collected_data[key] = data[key] end } # Log if @collected_data has gotten too large. if not @collected_data[:load].nil? and @collected_data[:load].length > 240 if @collected_data[:load].length > 1000 log_error "Easel dashboard has run collect_data more than a 1000 times. Memory size likely to be large." else log_warning "Easel dashboard starting to take up a lot of memory due to data_collection being turned on." end end @writers_semaphore.acquire 1 # Decrement @writers_semaphore end