module CPEE::Notifications

Public Class Methods

implementation(id,opts) click to toggle source
# File lib/cpee/implementation_notifications.rb, line 21
def self::implementation(id,opts)
  Proc.new do
    if CPEE::Persistence::exists?(id,opts)
      on resource "notifications" do
        run CPEE::Notifications::Overview if get
        on resource "topics" do
          run CPEE::Notifications::Topics, opts if get
        end
        on resource "subscriptions" do
          run CPEE::Notifications::Subscriptions, id, opts if get
          run CPEE::Notifications::CreateSubscription, id, opts if post 'create_subscription'
          on resource do
            run CPEE::Notifications::Subscription, id, opts if get
            run CPEE::Notifications::UpdateSubscription, id, opts if put 'change_subscription'
            run CPEE::Notifications::DeleteSubscription, id, opts if delete
            on resource 'sse' do
              run CPEE::Notifications::SSE, id, opts if sse
            end
          end
        end
      end
    else
      run CPEE::FAIL
    end
  end
end
sse_distributor(opts) click to toggle source
# File lib/cpee/implementation_notifications.rb, line 186
def self::sse_distributor(opts) #{{{
  conn = opts[:redis_dyn].call "Server SSE"
  conn.psubscribe('forward:*','event:state/change') do |on|
    on.pmessage do |pat, what, message|
      if pat == 'forward:*'
        id, key = what.match(/forward:([^\/]+)\/(.+)/).captures
        if sse = opts.dig(:sse_connections,id,key)
          sse.send message
        else
          DeleteSubscription::set(id,opts,key)
        end
      elsif pat == 'event:state/change'
        mess = JSON.parse(message[message.index(' ')+1..-1])
        state = mess.dig('content','state')
        if state == 'finished' || state == 'abandoned'
          opts.dig(:sse_connections,mess.dig('instance').to_s)&.each do |key,sse|
            EM.add_timer(10) do # just to be sure that all messages arrived. 10 seconds should be enough ... we think ... therefore we are (not sure)
              sse.close
            end
          end
        end
      end
    end
  end
  conn.close
end
sse_heartbeat(opts) click to toggle source
# File lib/cpee/implementation_notifications.rb, line 212
def self::sse_heartbeat(opts) #{{{
  opts.dig(:sse_connections).each do |id,keys|
    keys.each do |key,sse|
      sse.send_with_id('heartbeat', '42') unless sse&.closed?
    end
  end
end