module EventMachine::Protocols::Memcache

Implements the Memcache protocol (code.sixapart.com/svn/memcached/trunk/server/doc/protocol.txt). Requires memcached >= 1.2.4 w/ noreply support

Usage example

EM.run{
  cache = EM::P::Memcache.connect 'localhost', 11211

  cache.set :a, 'hello'
  cache.set :b, 'hi'
  cache.set :c, 'how are you?'
  cache.set :d, ''

  cache.get(:a){ |v| p v }
  cache.get_hash(:a, :b, :c, :d){ |v| p v }
  cache.get(:a,:b,:c,:d){ |a,b,c,d| p [a,b,c,d] }

  cache.get(:a,:z,:b,:y,:d){ |a,z,b,y,d| p [a,z,b,y,d] }

  cache.get(:missing){ |m| p [:missing=, m] }
  cache.set(:missing, 'abc'){ p :stored }
  cache.get(:missing){ |m| p [:missing=, m] }
  cache.del(:missing){ p :deleted }
  cache.get(:missing){ |m| p [:missing=, m] }
}

Constants

Cdeleted

@private

Cdelimiter

@private

Cempty

@private

Cend

@private

Cerror

@private

Cstored

@private

Cunknown

@private

Public Class Methods

connect(host = 'localhost', port = 11211) click to toggle source

Connect to a memcached server (must support NOREPLY, memcached >= 1.2.4)

# File lib/em/protocols/memcache.rb, line 114
def self.connect host = 'localhost', port = 11211
  EM.connect host, port, self, host, port
end
new(host, port = 11211) click to toggle source

@private

# File lib/em/protocols/memcache.rb, line 134
def initialize host, port = 11211
  @host, @port = host, port
end

Public Instance Methods

connection_completed() click to toggle source

@private

# File lib/em/protocols/memcache.rb, line 139
def connection_completed
  @get_cbs = []
  @set_cbs = []
  @del_cbs = []

  @values = {}

  @reconnecting = false
  @connected = true
  succeed
  # set_delimiter "\r\n"
  # set_line_mode
end
del(key, expires = 0, &cb)
Alias for: delete
delete(key, expires = 0, &cb) click to toggle source

Delete the value associated with a key

cache.del :a
cache.del(:b){ puts "deleted the value!" }
# File lib/em/protocols/memcache.rb, line 105
def delete key, expires = 0, &cb
  callback{
    send_data "delete #{key} #{expires}#{cb ? '' : ' noreply'}\r\n"
    @del_cbs << cb if cb
  }
end
Also aliased as: del
get(*keys) { |*map{ |k| values }| ... } click to toggle source

Get the value associated with one or multiple keys

cache.get(:a){ |v| p v }
cache.get(:a,:b,:c,:d){ |a,b,c,d| p [a,b,c,d] }
# File lib/em/protocols/memcache.rb, line 61
def get *keys
  raise ArgumentError unless block_given?

  callback{
    keys = keys.map{|k| k.to_s.gsub(/\s/,'_') }
    send_data "get #{keys.join(' ')}\r\n"
    @get_cbs << [keys, proc{ |values|
      yield *keys.map{ |k| values[k] }
    }]
  }
end
get_hash(*keys) { |inject({}){ |hash, k| update k => values }| ... } click to toggle source

Gets multiple values as a hash

cache.get_hash(:a, :b, :c, :d){ |h| puts h[:a] }
# File lib/em/protocols/memcache.rb, line 92
def get_hash *keys
  raise ArgumentError unless block_given?

  get *keys do |*values|
    yield keys.inject({}){ |hash, k| hash.update k => values[keys.index(k)] }
  end
end
process_cmd(line) click to toggle source
# File lib/em/protocols/memcache.rb, line 175
def process_cmd line
  case line.strip
  when /^VALUE\s+(.+?)\s+(\d+)\s+(\d+)/ # VALUE <key> <flags> <bytes>
    bytes = Integer($3)
    # set_binary_mode bytes+2
    # @cur_key = $1
    if @buffer.size >= bytes + 2
      @values[$1] = @buffer.slice!(0,bytes)
      @buffer.slice!(0,2) # \r\n
    else
      raise ParserError
    end

  when Cend # END
    if entry = @get_cbs.shift
      keys, cb = entry
      cb.call(@values)
    end
    @values = {}

  when Cstored # STORED
    if cb = @set_cbs.shift
      cb.call(true)
    end

  when Cdeleted # DELETED
    if cb = @del_cbs.shift
      cb.call(true)
    end

  when Cunknown # NOT_FOUND
    if cb = @del_cbs.shift
      cb.call(false)
    end

  else
    p [:MEMCACHE_UNKNOWN, line]
  end
end
receive_data(data) click to toggle source
# File lib/em/protocols/memcache.rb, line 158
def receive_data data
  (@buffer||='') << data

  while index = @buffer.index(Cdelimiter)
    begin
      line = @buffer.slice!(0,index+2)
      process_cmd line
    rescue ParserError
      @buffer[0...0] = line
      break
    end
  end
end
set(key, val, exptime = 0, &cb) click to toggle source

Set the value for a given key

cache.set :a, 'hello'
cache.set(:missing, 'abc'){ puts "stored the value!" }
# File lib/em/protocols/memcache.rb, line 78
def set key, val, exptime = 0, &cb
  callback{
    val = val.to_s
    send_cmd :set, key, 0, exptime, val.respond_to?(:bytesize) ? val.bytesize : val.size, !block_given?
    send_data val
    send_data Cdelimiter
    @set_cbs << cb if cb
  }
end
unbind() click to toggle source

@private

# File lib/em/protocols/memcache.rb, line 221
def unbind
  if @connected or @reconnecting
    EM.add_timer(1){ reconnect @host, @port }
    @connected = false
    @reconnecting = true
    @deferred_status = nil
  else
    raise 'Unable to connect to memcached server'
  end
end

Private Instance Methods

send_cmd(cmd, key, flags = 0, exptime = 0, bytes = 0, noreply = false) click to toggle source
# File lib/em/protocols/memcache.rb, line 118
def send_cmd cmd, key, flags = 0, exptime = 0, bytes = 0, noreply = false
  send_data "#{cmd} #{key} #{flags} #{exptime} #{bytes}#{noreply ? ' noreply' : ''}\r\n"
end