class ICFS::CacheElastic
Implements {ICFS::Cache Cache} using Elasticsearch
Constants
- DefaultSize
default page size
- Maps
The ES mappings for all of the indexes
- ResultsCase
the Case results fields
- ResultsEntry
Entry search results fields
- ResultsIndex
Index search results fields
- ResultsLog
Log search results fields
Public Class Methods
New instance
@param map [Hash] Symbol to String of the indexes. Must provide
:case, :log, :entry, :action, :current, and :lock
@param es [Faraday] Faraday instance to the Elasticsearch cluster
# File lib/icfs/cache_elastic.rb, line 163 def initialize(map, es) @map = map @es = es @name = '%s:%d' % [Socket.gethostname, Process.pid] @name.freeze end
Public Instance Methods
filter bucket aggregation
# File lib/icfs/cache_elastic.rb, line 872 def _agg_filter(name, qu, sub) ag = { name => { 'filter' => qu } } ag[name]['aggs'] = sub if sub return ag end
nested bucket aggregation
# File lib/icfs/cache_elastic.rb, line 882 def _agg_nested(name, field, sub) ag = { name => { 'nested' => { 'path' => field } } } ag[name]['aggs'] = sub if sub return ag end
stats metric aggregation
# File lib/icfs/cache_elastic.rb, line 854 def _agg_stats(name, field) { name => { 'stats' => { 'field' => field } } } end
terms bucket aggregation
# File lib/icfs/cache_elastic.rb, line 862 def _agg_terms(name, field, sub) ag = { name => { 'terms' => { 'field' => field } } } ag[name]['aggs'] = sub if sub return ag end
match all query
# File lib/icfs/cache_elastic.rb, line 256 def _query_all() { 'match_all' => {} } end
bool query
# File lib/icfs/cache_elastic.rb, line 941 def _query_bool(must, filter, should, must_not) qu = {} qu['must'] = must if(must && !must.empty?) qu['filter'] = filter if(filter && !filter.empty?) qu['should'] = should if(should && !should.empty?) qu['must_not'] = must_not if(must_not && !must_not.empty?) if qu.empty? return { 'match_all' => {} } else return { 'bool' => qu } end end
constant score
# File lib/icfs/cache_elastic.rb, line 1020 def _query_constant(filter) {'constant_score' => { 'filter' => filter } } end
Exists query
# File lib/icfs/cache_elastic.rb, line 901 def _query_exists(field, val) return nil if val.nil? { 'exists' => { 'field' => field } } end
keyword query
# File lib/icfs/cache_elastic.rb, line 909 def _query_keyw(field, val) return nil if val.nil? if val.is_a?(Array) qu = { 'terms' => { field => val } } else qu = {'term' => { field => val } } end return qu end
match query
# File lib/icfs/cache_elastic.rb, line 247 def _query_match(field, val) return nil if !val { 'match' => { field => { 'query' => val } } } end
Nested query
# File lib/icfs/cache_elastic.rb, line 454 def _query_nested(field, query) { 'nested' => { 'path' => field, 'query' => query } } end
prefix string query
# File lib/icfs/cache_elastic.rb, line 933 def _query_prefix(field, val) return nil if val.nil? return { 'prefix' => { field => val } } end
Term query
# File lib/icfs/cache_elastic.rb, line 892 def _query_term(field, val) return nil if val.nil? { 'term' => { field => val } } end
times query
# File lib/icfs/cache_elastic.rb, line 922 def _query_times(field, val_gt, val_lt) return nil if( val_gt.nil? && val_lt.nil? ) tq = {} tq['gt'] = val_gt if val_gt tq['lt'] = val_lt if val_lt return {'range' => { field => tq } } end
(see Cache#action_read
)
# File lib/icfs/cache_elastic.rb, line 551 def action_read(cid, anum) _read(:action, '%s.%d' % [cid, anum]) end
(see Cache#action_search
)
# File lib/icfs/cache_elastic.rb, line 567 def action_search(query) # build the query task_must = [ _query_match('tasks.title', query[:title]) ].compact task_filter = [ _query_term('tasks.assigned', query[:assigned]), _query_term('tasks.status', query[:status]), _query_term('tasks.flag', query[:flag]), _query_times('tasks.time', query[:after], query[:before]), _query_term('tasks.tags', query[:tags]), ].compact must = [ _query_nested( 'tasks', _query_bool(task_must, task_filter, nil, nil) ) ] filter = [ _query_term('caseid', query[:caseid]) ].compact req = { 'query' => _query_bool(must, filter, nil, nil) } # sort case query[:sort] when 'time_desc' srt = 'desc' when 'time_asc' srt = 'asc' else srt = query[:title] ? nil : 'desc' end if srt req['sort'] = [ { 'tasks.time' => { 'order' => srt, 'nested' => { 'path' => 'tasks', 'filter' => _query_term( 'tasks.assigned', query[:assigned]) } } }, { '_id' => { 'order' => 'desc' } } ] end # paging _page(query, req) # run the search url = @map[:action] + '/_search' body = JSON.generate(req) head = { 'Content-Type' => 'application/json' } resp = @es.run_request(:get, url, body, head) raise 'search failed' if !resp.success? return _results(resp, query) do |src| tsk = src['tasks'].select{|tk| tk['assigned'] == query[:assigned]}.first { caseid: src['caseid'], action: src['action'], status: tsk['status'], flag: tsk['flag'], title: tsk['title'], time: tsk['time'], tags: tsk['tags'], } end end
(see Cache#action_write
)
# File lib/icfs/cache_elastic.rb, line 559 def action_write(cid, anum, item) _write(:action, '%s.%d' % [cid, anum], item) end
(see Cache#case_read
)
# File lib/icfs/cache_elastic.rb, line 231 def case_read(cid) _read(:case, cid) end
(see Cache#case_search
)
# File lib/icfs/cache_elastic.rb, line 264 def case_search(query) # build the query must = [ _query_match('title', query[:title]), ].compact filter = [ _query_term('tags', query[:tags]), _query_term('status', query[:status]), _query_term('template', query[:template]), ].compact access = [ _query_term('access.grant', query[:grantee]), _query_term('access.perm', query[:perm]), ].compact unless access.empty? qu = (access.size == 1) ? access[0] : _query_bool(nil, access, nil, nil) filter << _query_nested('access', qu) end req = { 'query' => _query_bool(must, filter, nil, nil) } # highlight hl = {} hl['title'] = {} if query[:title] req['highlight'] = { 'fields' => hl } unless hl.empty? # sort unless query[:title] req['sort'] = { 'caseid' => 'asc' } end # paging _page(query, req) # run the search url = @map[:case] + '/_search' body = JSON.generate(req) head = { 'Content-Type' => 'application/json' } resp = @es.run_request(:get, url, body, head) raise 'search failed' if !resp.success? return _results(resp, query, ResultsCase) end
(see Cache#case_write
)
# File lib/icfs/cache_elastic.rb, line 239 def case_write(cid, item) _write(:case, cid, item) end
(see Cache#current_read
)
# File lib/icfs/cache_elastic.rb, line 215 def current_read(cid) _read(:current, cid) end
(see Cache#current_write
)
# File lib/icfs/cache_elastic.rb, line 223 def current_write(cid, item) _write(:current, cid, item) end
(see Cache#entry_read
)
# File lib/icfs/cache_elastic.rb, line 438 def entry_read(cid, enum) _read(:entry, '%s.%d' % [cid, enum]) end
(see Cache#entry_search
)
# File lib/icfs/cache_elastic.rb, line 467 def entry_search(query) # build the query must = [ _query_match('title', query[:title]), _query_match('content', query[:content]), ].compact filter = [ _query_term('tags', query[:tags]), _query_term('caseid', query[:caseid]), _query_times('time', query[:after], query[:before]), _query_term('action', query[:action]), _query_term('index', query[:index]), ].compact stats = [ _query_term('stats.name', query[:stat]), _query_term('stats.credit', query[:credit]), ].compact unless stats.empty? qu = (stats.size == 1) ? stats[0] : _query_bool(nil, stats, nil, nil) filter << _query_nested('stats', qu) end req = { 'query' => _query_bool(must, filter, nil, nil) } # highlight hl = {} hl['title'] = {} if query[:title] hl['content'] = {} if query[:content] req['highlight'] = { 'fields' => hl } unless hl.empty? # sort case query[:sort] when 'time_desc' req['sort'] = [ { 'time' => 'desc' }, { '_id' => 'desc' }, ] when 'time_asc' req['sort'] = [ { 'time' => 'asc' }, { '_id' => 'desc' }, ] when nil if !query[:title] && !query[:content] req['sort'] = [ { 'time' => 'desc' }, { '_id' => 'desc' }, ] end end # paging _page(query, req) # run the search url = @map[:entry] + '/_search' body = JSON.generate(req) head = { 'Content-Type' => 'application/json' } resp = @es.run_request(:get, url, body, head) raise 'search failed' if !resp.success? return _results(resp, query, ResultsEntry) end
(see Cache#entry_write
)
# File lib/icfs/cache_elastic.rb, line 446 def entry_write(cid, enum, item) _write(:entry, '%s.%d' % [cid, enum], item) end
(see Cache#index_read
)
# File lib/icfs/cache_elastic.rb, line 652 def index_read(cid, xnum) _read(:index, '%s.%d' % [cid, xnum]) end
(see Cache#index_search
)
# File lib/icfs/cache_elastic.rb, line 659 def index_search(query) # build the query must = [ _query_match('title', query[:title]), _query_match('content', query[:content]), ].compact filter = [ _query_term('caseid', query[:caseid]), _query_term('tags', query[:tags]), _query_prefix('title.raw', query[:prefix]), ].compact req = { 'query' => _query_bool(must, filter, nil, nil) } # highlight hl = {} hl['title'] = {} if query[:title] hl['content'] = {} if query[:content] req['highlight'] = { 'fields' => hl } unless hl.empty? # sort case query[:sort] when 'index_asc' req['sort'] = [ { 'index' => 'asc' }, { '_id' => 'desc' }, ] when 'index_desc' req['sort'] = [ { 'index' => 'desc' }, { '_id' => 'desc' }, ] when 'title_desc' req['sort'] = [ { 'title.raw' => 'desc' }, { '_id' => 'desc' }, ] when 'title_asc' req['sort'] = [ { 'title.raw' => 'asc' }, { '_id' => 'desc' }, ] else # default if not a title/content query if must.empty? req['sort'] = [ { 'title.raw' => 'asc' }, { '_id' => 'desc' }, ] end end # paging _page(query, req) # run the search url = @map[:index] + '/_search' body = JSON.generate(req) head = { 'Content-Type' => 'application/json' } resp = @es.run_request(:get, url, body, head) raise 'search failed' if !resp.success? return _results(resp, query, ResultsIndex) end
(see Cache#index_write
)
# File lib/icfs/cache_elastic.rb, line 644 def index_write(cid, xnum, item) _write(:index, '%s.%d' % [cid, xnum], item) end
(see Cache#lock_release
)
# File lib/icfs/cache_elastic.rb, line 203 def lock_release(cid) url = '%s/_doc/%s' % [@map[:lock], CGI.escape(cid)] resp = @es.run_request(:delete, url, '', {}) if !resp.success? raise('Elasticsearch lock release failed: %s' % cid) end end
(see Cache#lock_take
)
# File lib/icfs/cache_elastic.rb, line 180 def lock_take(cid) json = '{"client":"%s"}' % @name url = '%s/_doc/%s/_create' % [@map[:lock], CGI.escape(cid)] head = {'Content-Type' => 'application/json'}.freeze # try to take tries = 5 while tries > 0 resp = @es.run_request(:put, url, json, head) return true if resp.success? tries = tries - 1 sleep(0.1) end # failed to take lock raise('Elasticsearch lock take failed: %s' % cid) end
(see Cache#log_read
)
# File lib/icfs/cache_elastic.rb, line 779 def log_read(cid, lnum) _read(:log, '%s.%d' % [cid, lnum]) end
(see Cache#log_search
)
# File lib/icfs/cache_elastic.rb, line 809 def log_search(query) # build the query filter = [ _query_term('caseid', query[:caseid]), _query_times('time', query[:after], query[:before]), _query_term('user', query[:user]), _query_exists('case.set', query[:case_edit]), _query_term('entry.num', query[:entry]), _query_term('index.num', query[:index]), _query_term('action.num', query[:action]), ].compact req = { 'query' => _query_bool(nil, filter, nil, nil) } # sort case query[:sort] when 'time_desc', nil req['sort'] = [ { 'time' => 'desc' }, { '_id' => 'desc' }, ] when 'time_asc' req['sort'] = [ { 'time' => 'asc' }, { '_id' => 'desc' }, ] end # paging _page(query, req) # run the search url = @map[:log] + '/_search' body = JSON.generate(req) head = { 'Content-Type' => 'application/json' } resp = @es.run_request(:get, url, body, head) raise 'search failed' if !resp.success? return _results(resp, query, ResultsLog) end
(see Cache#log_write
)
# File lib/icfs/cache_elastic.rb, line 787 def log_write(cid, lnum, item) _write(:log, '%s.%d' % [cid, lnum], item) end
(see Cache#stats
)
# File lib/icfs/cache_elastic.rb, line 958 def stats(query) # aggs ag = _agg_stats('vals', 'stats.value') ag = _agg_terms('stats', 'stats.name', ag) if query[:credit] cd = _query_term('stats.credit', query[:credit]) ag = _agg_filter('credit', cd, ag) end ag = _agg_nested('nested', 'stats', ag) # build the query filt = [ _query_term('caseid', query[:caseid]), _query_times('time', query[:after], query[:before]), ].compact qu = _query_bool(nil, filt, nil, nil) # the request req = { 'query' => qu, 'aggs' => ag, 'size' => 0, } # run the search url = @map[:entry] + '/_search' body = JSON.generate(req) head = { 'Content-Type' => 'application/json' } resp = @es.run_request(:get, url, body, head) raise 'search failed' if !resp.success? # extract stats rh = JSON.parse(resp.body) if query[:credit] rh = rh['aggregations']['nested']['credit']['stats']['buckets'] else rh = rh['aggregations']['nested']['stats']['buckets'] end list = rh.map do |hh| { object: { stat: hh['key'], sum: hh['vals']['sum'], count: hh['vals']['count'], min: hh['vals']['min'], max: hh['vals']['max'], } } end # return the results return { query: query, list: list } end
Private Instance Methods
Do paging
@param query [Hash] the query @param req [Hash] the constructed ES request
# File lib/icfs/cache_elastic.rb, line 417 def _page(query, req) # size defaults size = query[:size] ? query[:size].to_i : 0 size = DefaultSize if size == 0 # page defaults to 1 page = query[:page] ? query[:page].to_i : 0 page = 1 if page == 0 req['size'] = size req['from'] = (page - 1) * size end
Process search results
@param resp [Hash] the response from Elasticsearch @param query [Hash] the original request @param fields [Hash] Fields to return @yield [src] The source object @yieldreturn [Hash] the search result object
# File lib/icfs/cache_elastic.rb, line 328 def _results(resp, query, fields=nil) # size defaults to 25 size = query[:size] ? query[:size].to_i : 0 size = DefaultSize if size == 0 rh = JSON.parse(resp.body) results = { query: query, hits: rh['hits']['total'], size: size, } # process each result results[:list] = rh['hits']['hits'].map do |hh| src = hh['_source'] hl = hh['highlight'] if hl snip = String.new hl.each{|fn, ary| ary.each{|ht| snip << ht}} else snip = nil end # fields provided if fields obj = {} fields.each do |aa, bb| if bb.is_a?(Array) case bb[1] # a sub value when :sub val = src[bb[0]] obj[aa] = val.nil? ? 0 : val[bb[2]] # size of a value when :size val = src[bb[0]] obj[aa] = val.nil? ? 0 : val.size # zero for nil when :zero val = src[bb[0]] obj[aa] = val.nil? ? 0 : val # empty array for nil when :empty val = src[bb[0]] obj[aa] = val.nil? ? [] : val else raise(ArgumentError, 'Not a valid field option') end else obj[aa] = src[bb] end end # pass the source to the block to generate the search object else obj = yield src end # and provide each result { score: hh['_score'], snippet: snip, object: obj, } end return results end