class CorosyncCommander
This provides a simplified interface into Corosync::CPG. The main use case is for sending commands to a remote server, and waiting for the responses.
This library takes care of:
-
Ensuring a consistent message format.
-
Sending messages to all, or just specific nodes.
-
Invoking the appropriate callback (and passing parameters) based on the command sent.
-
Resonding with the return value of the callback.
-
Handling exceptions and sending them back to the sender.
-
Knowing exactly how many responses should be coming back.
@example
cc = CorosyncCommander.new cc.commands.register('shell command') do |sender, shellcmd| %x{#{shellcmd}} end cc.join('my group') exe = cc.execute([], 'shell command', 'hostname') enum = exe.to_enum hostnames = [] begin enum.each do |sender, response| hostnames << response end rescue CorosyncCommander::RemoteException => e puts "Caught remote exception: #{e}" retry end puts "Hostnames: #{hostnames.join(' ')}"
IMPORTANT: Will not work without tuning ruby.¶ ↑
You cannot use this with MRI Ruby older than 2.0. Even with 2.0 you must tune ruby. This is because Corosync CPG (as of 1.4.3) allocates a 1mb buffer on the stack. Ruby 2.0 only allocates a 512kb stack for threads. This gem uses a thread for handling incoming messages. Thus if you try to use older ruby you will get segfaults.
Ruby 2.0 allows increasing the thread stack size. You can do this with the RUBY_THREAD_MACHINE_STACK_SIZE environment variable. The advised value to set is 1.5mb.
RUBY_THREAD_MACHINE_STACK_SIZE=1572864 ruby yourscript.rb
Attributes
@!visibility private
Public Class Methods
Creates a new instance and connects to CPG. If a group name is provided, it will join that group. Otherwise it will only connect. This is so that you can establish the command callbacks and avoid NotImplementedError exceptions @param group_name [String] Name of the group to join
# File lib/corosync_commander.rb, line 61 def initialize(group_name = nil) @cpg = Corosync::CPG.new @cpg.on_message {|*args| cpg_message(*args)} @cpg.on_confchg {|*args| cpg_confchg(*args)} @cpg.connect @cpg.fd.close_on_exec = true @quorum = Corosync::Quorum.new @quorum.on_notify {|*args| quorum_notify(*args)} @quorum.connect @quorum.fd.close_on_exec = true @cpg_members = nil @leader_pool = [] @leader_pool.extend(Sync_m) # we can either share the msgid counter across all threads, or have a msgid counter on each thread and send the thread ID with each message. I prefer the former @next_execution_id = 0 @next_execution_id_mutex = Mutex.new @execution_queues = {} @execution_queues.extend(Sync_m) @command_callbacks = CorosyncCommander::CallbackList.new if RUBY_ENGINE == 'ruby' and (Gem::Version.new(RUBY_VERSION) < Gem::Version.new('2.0.0') or ENV['RUBY_THREAD_MACHINE_STACK_SIZE'].to_i < 1572864) then abort "MRI Ruby must be >= 2.0 and RUBY_THREAD_MACHINE_STACK_SIZE must be > 1572864" end join(group_name) if group_name end
Public Instance Methods
@!attribute [r] commands @return [CorosyncCommander::CallbackList] List of command callbacks
# File lib/corosync_commander.rb, line 307 def commands @command_callbacks end
@!visibility private
# File lib/corosync_commander.rb, line 246 def cpg_confchg(member_list, left_list, join_list) @cpg_members = member_list if leader_position == -1 then # this will only happen on join @leader_pool.sync_synchronize(:EX) do @leader_pool.replace(member_list.to_a) end elsif left_list.size > 0 then @leader_pool.sync_synchronize(:EX) do @leader_pool.delete_if {|m| left_list.include?(m)} end end @confchg_callback.call(member_list, left_list, join_list) if @confchg_callback # we look for any members leaving the cluster, and if so we notify all threads that are waiting for a response that they may have just lost a node return if left_list.size == 0 messages = left_list.map do |member| CorosyncCommander::Execution::Message.new(:sender => member, :type => 'leave') end @execution_queues.sync_synchronize(:SH) do @execution_queues.values.each do |queue| messages.each do |message| queue << message end end end end
Used as a callback on receipt of a CPG message @param sender [Corosync::CPG::Member] Sender of the message @param message [String] data structure passed to @cpg.send
* message[0] == [Array<String>] Each string is "nodeid:pid" of the intended message recipients * msgid == [Integer] * In the event of a new message, this, combined with `member` will uniquely identify this message * In the event of a reply, this is the message ID sent in the original message * type == [String] command/response/exception * args == [Array] * In the event of a command, this will be the arguments passed to CorosyncCommander.send * In the event of a response, this will be the return value of the command handler * In the event of an exception, this will be the exception string and backtrace
@!visibility private
# File lib/corosync_commander.rb, line 171 def cpg_message(sender, message) message = CorosyncCommander::Execution::Message.from_cpg_message(sender, message) # This is the possible message classifications # Command echo (potentially also a command to us) # Response echo # Command to us # Response to us # Command to someone else # Response to someone else if message.type == 'command' and sender == @cpg.member then # It's a command echo execution_queue = nil @execution_queues.sync_synchronize(:SH) do execution_queue = @execution_queues[message.execution_id] end if !execution_queue.nil? then # someone is listening message_echo = message.dup message_echo.type = 'echo' message_echo.content = @cpg_members execution_queue << message_echo end elsif message.type == 'leader reset' then # The sender is requesting we reset their leader position # For remote node, act as if the node left. # For the local node, act as if we just joined. if sender != @cpg.member then @leader_pool.sync_synchronize(:EX) do @leader_pool.delete(sender) end else @leader_pool.sync_synchronize(:EX) do @leader_pool.replace(@cpg_members.to_a) end end elsif message.type != 'command' and message.recipients.include?(@cpg.member) then # It's a response to us execution_queue = nil @execution_queues.sync_synchronize(:SH) do execution_queue = @execution_queues[message.execution_id] end if !execution_queue.nil? then # someone is listening execution_queue << message end end if message.type == 'command' and (message.recipients.size == 0 or message.recipients.include?(@cpg.member)) then # It's a command to us begin # see if we've got a registered callback command_callback = nil command_name = message.content[0] command_callback = @command_callbacks[command_name] if command_callback.nil? then raise NotImplementedError, "No callback registered for command '#{command_name}'" end command_args = message.content[1] reply_value = command_callback.call(message.sender, *command_args) message_reply = message.reply(reply_value) @cpg.send(message_reply) rescue => e $stderr.puts "Exception: #{e} (#{e.class})\n#{e.backtrace.join("\n")}" message_reply = message.reply([e.class, e.to_s, e.backtrace]) message_reply.type = 'exception' @cpg.send(message_reply) end end end
Execute a remote command. @param recipients [Array<Corosync::CPG::Member>] List of recipients to send to, or an empty array to broadcast to all members of the group. @param command [String] The name of the remote command to execute. If no such command exists on the remote node a NotImplementedError exception will be raised when enumerating the results. @param args Any further arguments will be passed to the command callback on the remote host. @return [CorosyncCommander::Execution]
# File lib/corosync_commander.rb, line 316 def execute(recipients, command, *args) execution = CorosyncCommander::Execution.new(self, next_execution_id, recipients, command, args) message = CorosyncCommander::Execution::Message.new(:recipients => recipients, :execution_id => execution.id, :type => 'command', :content => [command, args]) @execution_queues.synchronize(:EX) do @execution_queues[execution.id] = execution.queue end # Technique stolen from http://www.mikeperham.com/2010/02/24/the-trouble-with-ruby-finalizers/ #TODO We definitately need a spec test to validate the execution object gets garbage collected ObjectSpace.define_finalizer(execution, execution_queue_finalizer(execution.id)) @cpg.send(message) execution end
Joins the specified group. This is provided separate from initialization so that callbacks can be registered before joining the group so that you wont get NotImplementedError exceptions @param group_name [String] Name of group to join @return [void]
# File lib/corosync_commander.rb, line 122 def join(group_name) start unless @dispatch_thread @cpg.join(group_name) end
Indicates whether we are the group leader. If we are the leader, it means that we are the oldest member of the group. This is slightly different than just calling ‘leader_position == 0` in that if it is -1 (meaning we havent received the CPG confchg callback yet), we wait for the CPG join to complete. @return [Boolean]
# File lib/corosync_commander.rb, line 355 def leader? # The way leadership works is that we record the members that were present when we joined the group in @leader_pool. Each time a node leaves the group, we remove them from @leader_pool. Once we become the only member in @leader_pool, we are the leader. # Now in the event that the cluster splits, this becomes complicated. Each side will see the members of the other side leaving. So each side will end up with their own leader. Since they can't talk to eachother, having a leader in each group is perfectly fine. However when the 2 sides re-join, each side will see the members of the other side joining as new nodes, and both leaders will remain as leaders. # We solve this by using the quorum status. When we go from inquorate to quorate, we give up our position. We send a 'leader reset' command to the cluster which tells everyone to remove us from their @leader_pool. When we receive the message ourself, we set @leader_pool to the group members at that moment. # It doesn't matter if multiple members end up doing a 'leader reset' at the same time. It basically simulates the node leaving and then joining. Whoever performs the action first will move to the front. It will capture @leader_pool as the current members when it receives it's own message, and as the other resets come in, it will remove those members. Leaving itself in front of the ones that just joined (and reset after). But it will still remain after all the members that didn't do a reset. position = nil loop do position = leader_position break if position != -1 Thread.pass # let the dispatch thread run so we can get our join message # This isn't ideal as if the dispatch thread doesn't immediatly complete the join, we start spinning. # But the only other way is to use condition variables, which combined with Sync_m, would just be really messy and stupidly complex (and I don't want to go to a plain mutex and lose the ability to use shared locks). end position == 0 end
Gets the member’s position in the leadership queue. The leadership position is simply how many nodes currently in the group were in the group before we joined. @return [Integer]
# File lib/corosync_commander.rb, line 345 def leader_position @leader_pool.synchronize(:SH) do @leader_pool.size - 1 end end
Leave the active CPG group. Will not stop quorum notifications. If you wish to stop quorum as well you should use {#stop} instead. @return [void]
# File lib/corosync_commander.rb, line 131 def leave @cpg.leave end
List of current members @return [Array<Corosync::CPG::Member>] List of members currently in the group
# File lib/corosync_commander.rb, line 381 def members @cpg_members end
Callback to execute when the CPG configuration changes @yieldparam member_list [Array<Corosync::CPG::Member>] List of members in group after change @yieldparam left_list [Array<Corosync::CPG::Member>] List of members which left the group @yieldparam join_list [Array<Corosync::CPG::Member>] List of members which joined the group
# File lib/corosync_commander.rb, line 281 def on_confchg(&block) @confchg_callback = block end
Callback to execute when the quorum state changes @yieldparam quorate [Boolean] Whether cluster is quorate @yieldparam member_list [Array] List of node IDs in the cluster after change
# File lib/corosync_commander.rb, line 301 def on_quorumchg(&block) @quorumchg_callback = block end
Indicates whether cluster is quorate. @return [Boolean]
# File lib/corosync_commander.rb, line 375 def quorate? @quorate end
@!visibility private
# File lib/corosync_commander.rb, line 286 def quorum_notify(quorate, node_list) return if @quorate == quorate # didn't change @quorate = quorate if quorate then # we just became quorate @cpg.send(CorosyncCommander::Execution::Message.new(:recipients => [], :type => 'leader reset')) # 'leader reset' simulates a leave and then a join of this node end @quorumchg_callback.call(quorate, node_list) if @quorumchg_callback end
Starts watching for notifications @return [void]
# File lib/corosync_commander.rb, line 96 def start @quorum.start @dispatch_thread = Thread.new do begin loop do select_ready = select([@cpg.fd, @quorum.fd], [], []) if select_ready[0].include?(@quorum.fd) then @quorum.dispatch end if select_ready[0].include?(@cpg.fd) then @cpg.dispatch end end rescue Exception => e # something happened that we don't know how to handle. We need to bail out. $stderr.write "Fatal exception: #{e.to_s} (#{e.class})\n#{e.backtrace.join("\n")}\n" exit(1) end end end
Shuts down the dispatch thread and disconnects CPG @return [void]
# File lib/corosync_commander.rb, line 137 def stop @dispatch_thread.kill if !@dispatch_thread.nil? @dispatch_thread = nil @cpg.close if !@cpg.nil? @cpg = nil @cpg_members = nil @quorum.finalize if !@quorum.nil? @quorum = nil end
Private Instance Methods
This is so that we remove our queue from the execution queue list when we get garbage collected.
# File lib/corosync_commander.rb, line 333 def execution_queue_finalizer(execution_id) proc do @execution_queues.synchronize(:EX) do @execution_queues.delete(execution_id) end end end
# File lib/corosync_commander.rb, line 149 def next_execution_id() id = nil @next_execution_id_mutex.synchronize do id = @next_execution_id += 1 end id end