class CorosyncCommander

This provides a simplified interface into Corosync::CPG. The main use case is for sending commands to a remote server, and waiting for the responses.

This library takes care of:

@example

cc = CorosyncCommander.new
cc.commands.register('shell command') do |sender, shellcmd|
  %x{#{shellcmd}}
end
cc.join('my group')

exe = cc.execute([], 'shell command', 'hostname')

enum = exe.to_enum
hostnames = []
begin
  enum.each do |sender, response|
    hostnames << response
  end
rescue CorosyncCommander::RemoteException => e
  puts "Caught remote exception: #{e}"
  retry
end

puts "Hostnames: #{hostnames.join(' ')}"

IMPORTANT: Will not work without tuning ruby.

You cannot use this with MRI Ruby older than 2.0. Even with 2.0 you must tune ruby. This is because Corosync CPG (as of 1.4.3) allocates a 1mb buffer on the stack. Ruby 2.0 only allocates a 512kb stack for threads. This gem uses a thread for handling incoming messages. Thus if you try to use older ruby you will get segfaults.

Ruby 2.0 allows increasing the thread stack size. You can do this with the RUBY_THREAD_MACHINE_STACK_SIZE environment variable. The advised value to set is 1.5mb.

RUBY_THREAD_MACHINE_STACK_SIZE=1572864 ruby yourscript.rb

Attributes

cpg[R]
dispatch_thread[R]

@!visibility private

execution_queues[R]

Public Class Methods

new(group_name = nil) click to toggle source

Creates a new instance and connects to CPG. If a group name is provided, it will join that group. Otherwise it will only connect. This is so that you can establish the command callbacks and avoid NotImplementedError exceptions @param group_name [String] Name of the group to join

# File lib/corosync_commander.rb, line 61
def initialize(group_name = nil)
        @cpg = Corosync::CPG.new
        @cpg.on_message {|*args| cpg_message(*args)}
        @cpg.on_confchg {|*args| cpg_confchg(*args)}
        @cpg.connect
        @cpg.fd.close_on_exec = true

        @quorum = Corosync::Quorum.new
        @quorum.on_notify {|*args| quorum_notify(*args)}
        @quorum.connect
        @quorum.fd.close_on_exec = true

        @cpg_members = nil

        @leader_pool = []
        @leader_pool.extend(Sync_m)

        # we can either share the msgid counter across all threads, or have a msgid counter on each thread and send the thread ID with each message. I prefer the former
        @next_execution_id = 0
        @next_execution_id_mutex = Mutex.new

        @execution_queues = {}
        @execution_queues.extend(Sync_m)

        @command_callbacks = CorosyncCommander::CallbackList.new

        if RUBY_ENGINE == 'ruby' and (Gem::Version.new(RUBY_VERSION) < Gem::Version.new('2.0.0') or ENV['RUBY_THREAD_MACHINE_STACK_SIZE'].to_i < 1572864) then
                abort "MRI Ruby must be >= 2.0 and RUBY_THREAD_MACHINE_STACK_SIZE must be > 1572864"
        end

        join(group_name) if group_name
end

Public Instance Methods

commands() click to toggle source

@!attribute [r] commands @return [CorosyncCommander::CallbackList] List of command callbacks

# File lib/corosync_commander.rb, line 307
def commands
        @command_callbacks
end
cpg_confchg(member_list, left_list, join_list) click to toggle source

@!visibility private

# File lib/corosync_commander.rb, line 246
def cpg_confchg(member_list, left_list, join_list)
        @cpg_members = member_list

        if leader_position == -1 then # this will only happen on join
                @leader_pool.sync_synchronize(:EX) do
                        @leader_pool.replace(member_list.to_a)
                end
        elsif left_list.size > 0 then
                @leader_pool.sync_synchronize(:EX) do
                        @leader_pool.delete_if {|m| left_list.include?(m)}
                end
        end

        @confchg_callback.call(member_list, left_list, join_list) if @confchg_callback

        # we look for any members leaving the cluster, and if so we notify all threads that are waiting for a response that they may have just lost a node
        return if left_list.size == 0

        messages = left_list.map do |member|
                CorosyncCommander::Execution::Message.new(:sender => member, :type => 'leave')
        end

        @execution_queues.sync_synchronize(:SH) do
                @execution_queues.values.each do |queue|
                        messages.each do |message|
                                queue << message
                        end
                end
        end
end
cpg_message(sender, message) click to toggle source

Used as a callback on receipt of a CPG message @param sender [Corosync::CPG::Member] Sender of the message @param message [String] data structure passed to @cpg.send

* message[0] == [Array<String>] Each string is "nodeid:pid" of the intended message recipients
* msgid == [Integer]
  * In the event of a new message, this, combined with `member` will uniquely identify this message
  * In the event of a reply, this is the message ID sent in the original message
* type == [String] command/response/exception
* args == [Array]
  * In the event of a command, this will be the arguments passed to CorosyncCommander.send
  * In the event of a response, this will be the return value of the command handler
  * In the event of an exception, this will be the exception string and backtrace

@!visibility private

# File lib/corosync_commander.rb, line 171
def cpg_message(sender, message)
        message = CorosyncCommander::Execution::Message.from_cpg_message(sender, message)

        # This is the possible message classifications
        # Command echo (potentially also a command to us)
        # Response echo
        # Command to us
        # Response to us
        # Command to someone else
        # Response to someone else

        if message.type == 'command' and sender == @cpg.member then
                # It's a command echo
                execution_queue = nil
                @execution_queues.sync_synchronize(:SH) do
                        execution_queue = @execution_queues[message.execution_id]
                end
                if !execution_queue.nil? then
                        # someone is listening
                        message_echo = message.dup
                        message_echo.type = 'echo'
                        message_echo.content = @cpg_members
                        execution_queue << message_echo
                end
        elsif message.type == 'leader reset' then
                # The sender is requesting we reset their leader position
                # For remote node, act as if the node left.
                # For the local node, act as if we just joined.
                if sender != @cpg.member then
                        @leader_pool.sync_synchronize(:EX) do
                                @leader_pool.delete(sender)
                        end
                else
                        @leader_pool.sync_synchronize(:EX) do
                                @leader_pool.replace(@cpg_members.to_a)
                        end
                end
        elsif message.type != 'command' and message.recipients.include?(@cpg.member) then
                # It's a response to us
                execution_queue = nil
                @execution_queues.sync_synchronize(:SH) do
                        execution_queue = @execution_queues[message.execution_id]
                end
                if !execution_queue.nil? then
                        # someone is listening
                        execution_queue << message
                end
        end

        if message.type == 'command' and (message.recipients.size == 0 or message.recipients.include?(@cpg.member)) then
                # It's a command to us
                begin
                        # see if we've got a registered callback
                        command_callback = nil

                        command_name = message.content[0]
                        command_callback = @command_callbacks[command_name]
                        if command_callback.nil? then
                                raise NotImplementedError, "No callback registered for command '#{command_name}'"
                        end

                        command_args = message.content[1]
                        reply_value = command_callback.call(message.sender, *command_args)
                        message_reply = message.reply(reply_value)
                        @cpg.send(message_reply)
                rescue => e
                        $stderr.puts "Exception: #{e} (#{e.class})\n#{e.backtrace.join("\n")}"
                        message_reply = message.reply([e.class, e.to_s, e.backtrace])
                        message_reply.type = 'exception'
                        @cpg.send(message_reply)
                end
        end
end
execute(recipients, command, *args) click to toggle source

Execute a remote command. @param recipients [Array<Corosync::CPG::Member>] List of recipients to send to, or an empty array to broadcast to all members of the group. @param command [String] The name of the remote command to execute. If no such command exists on the remote node a NotImplementedError exception will be raised when enumerating the results. @param args Any further arguments will be passed to the command callback on the remote host. @return [CorosyncCommander::Execution]

# File lib/corosync_commander.rb, line 316
def execute(recipients, command, *args)
        execution = CorosyncCommander::Execution.new(self, next_execution_id, recipients, command, args)

        message = CorosyncCommander::Execution::Message.new(:recipients => recipients, :execution_id => execution.id, :type => 'command', :content => [command, args])

        @execution_queues.synchronize(:EX) do
                @execution_queues[execution.id] = execution.queue
        end
        # Technique stolen from http://www.mikeperham.com/2010/02/24/the-trouble-with-ruby-finalizers/
        #TODO We definitately need a spec test to validate the execution object gets garbage collected
        ObjectSpace.define_finalizer(execution, execution_queue_finalizer(execution.id))

        @cpg.send(message)

        execution
end
join(group_name) click to toggle source

Joins the specified group. This is provided separate from initialization so that callbacks can be registered before joining the group so that you wont get NotImplementedError exceptions @param group_name [String] Name of group to join @return [void]

# File lib/corosync_commander.rb, line 122
def join(group_name)
        start unless @dispatch_thread

        @cpg.join(group_name)
end
leader?() click to toggle source

Indicates whether we are the group leader. If we are the leader, it means that we are the oldest member of the group. This is slightly different than just calling ‘leader_position == 0` in that if it is -1 (meaning we havent received the CPG confchg callback yet), we wait for the CPG join to complete. @return [Boolean]

# File lib/corosync_commander.rb, line 355
def leader?

        # The way leadership works is that we record the members that were present when we joined the group in @leader_pool. Each time a node leaves the group, we remove them from @leader_pool. Once we become the only member in @leader_pool, we are the leader.
        # Now in the event that the cluster splits, this becomes complicated. Each side will see the members of the other side leaving. So each side will end up with their own leader. Since they can't talk to eachother, having a leader in each group is perfectly fine. However when the 2 sides re-join, each side will see the members of the other side joining as new nodes, and both leaders will remain as leaders.
        # We solve this by using the quorum status. When we go from inquorate to quorate, we give up our position. We send a 'leader reset' command to the cluster which tells everyone to remove us from their @leader_pool. When we receive the message ourself, we set @leader_pool to the group members at that moment.
        # It doesn't matter if multiple members end up doing a 'leader reset' at the same time. It basically simulates the node leaving and then joining. Whoever performs the action first will move to the front. It will capture @leader_pool as the current members when it receives it's own message, and as the other resets come in, it will remove those members. Leaving itself in front of the ones that just joined (and reset after). But it will still remain after all the members that didn't do a reset.

        position = nil
        loop do
                position = leader_position
                break if position != -1
                Thread.pass # let the dispatch thread run so we can get our join message
                # This isn't ideal as if the dispatch thread doesn't immediatly complete the join, we start spinning.
                # But the only other way is to use condition variables, which combined with Sync_m, would just be really messy and stupidly complex (and I don't want to go to a plain mutex and lose the ability to use shared locks).
        end
        position == 0
end
leader_position() click to toggle source

Gets the member’s position in the leadership queue. The leadership position is simply how many nodes currently in the group were in the group before we joined. @return [Integer]

# File lib/corosync_commander.rb, line 345
def leader_position
        @leader_pool.synchronize(:SH) do
                @leader_pool.size - 1
        end
end
leave() click to toggle source

Leave the active CPG group. Will not stop quorum notifications. If you wish to stop quorum as well you should use {#stop} instead. @return [void]

# File lib/corosync_commander.rb, line 131
def leave
        @cpg.leave
end
members() click to toggle source

List of current members @return [Array<Corosync::CPG::Member>] List of members currently in the group

# File lib/corosync_commander.rb, line 381
def members
        @cpg_members
end
on_confchg(&block) click to toggle source

Callback to execute when the CPG configuration changes @yieldparam member_list [Array<Corosync::CPG::Member>] List of members in group after change @yieldparam left_list [Array<Corosync::CPG::Member>] List of members which left the group @yieldparam join_list [Array<Corosync::CPG::Member>] List of members which joined the group

# File lib/corosync_commander.rb, line 281
def on_confchg(&block)
        @confchg_callback = block
end
on_quorumchg(&block) click to toggle source

Callback to execute when the quorum state changes @yieldparam quorate [Boolean] Whether cluster is quorate @yieldparam member_list [Array] List of node IDs in the cluster after change

# File lib/corosync_commander.rb, line 301
def on_quorumchg(&block)
        @quorumchg_callback = block
end
quorate?() click to toggle source

Indicates whether cluster is quorate. @return [Boolean]

# File lib/corosync_commander.rb, line 375
def quorate?
        @quorate
end
quorum_notify(quorate, node_list) click to toggle source

@!visibility private

# File lib/corosync_commander.rb, line 286
def quorum_notify(quorate, node_list)
        return if @quorate == quorate # didn't change
        @quorate = quorate

        if quorate then
                # we just became quorate
                @cpg.send(CorosyncCommander::Execution::Message.new(:recipients => [], :type => 'leader reset')) # 'leader reset' simulates a leave and then a join of this node
        end

        @quorumchg_callback.call(quorate, node_list) if @quorumchg_callback
end
start() click to toggle source

Starts watching for notifications @return [void]

# File lib/corosync_commander.rb, line 96
def start
        @quorum.start

        @dispatch_thread = Thread.new do
                begin
                        loop do
                                select_ready = select([@cpg.fd, @quorum.fd], [], [])
                                if select_ready[0].include?(@quorum.fd) then
                                        @quorum.dispatch
                                end
                                if select_ready[0].include?(@cpg.fd) then
                                        @cpg.dispatch
                                end
                        end
                rescue Exception => e
                        # something happened that we don't know how to handle. We need to bail out.
                        $stderr.write "Fatal exception: #{e.to_s} (#{e.class})\n#{e.backtrace.join("\n")}\n"
                        exit(1)
                end
        end
end
stop() click to toggle source

Shuts down the dispatch thread and disconnects CPG @return [void]

# File lib/corosync_commander.rb, line 137
def stop
        @dispatch_thread.kill if !@dispatch_thread.nil?
        @dispatch_thread = nil

        @cpg.close if !@cpg.nil?
        @cpg = nil
        @cpg_members = nil

        @quorum.finalize if !@quorum.nil?
        @quorum = nil
end

Private Instance Methods

execution_queue_finalizer(execution_id) click to toggle source

This is so that we remove our queue from the execution queue list when we get garbage collected.

# File lib/corosync_commander.rb, line 333
def execution_queue_finalizer(execution_id)
        proc do
                @execution_queues.synchronize(:EX) do
                        @execution_queues.delete(execution_id)
                end
        end
end
next_execution_id() click to toggle source
# File lib/corosync_commander.rb, line 149
def next_execution_id()
        id = nil
        @next_execution_id_mutex.synchronize do
                id = @next_execution_id += 1
        end
        id
end