class Fluent::Plugin::EverySenseOutput
EverySenseOutput
output data to EverySense server this module assumes the input format follows everysense output specification
Public Instance Methods
configure(conf)
click to toggle source
This method is called before starting. 'conf' is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.
Calls superclass method
# File lib/fluent/plugin/out_everysense.rb, line 60 def configure(conf) super compat_parameters_convert(conf, :formatter, :inject, :buffer, default_chunk_key: "time") formatter_config = conf.elements(name: 'format').first @formatter = formatter_create(conf: formatter_config) @has_buffer_section = conf.elements(name: 'buffer').size > 0 @out_sensors = {} @sensors.each do |sensor| if sensor.input_name.nil? @out_sensors[sensor.output_name] = sensor else @out_sensors[sensor.input_name] = sensor end end log.debug @out_sensors.inspect end
force_type(value, out_sensor)
click to toggle source
# File lib/fluent/plugin/out_everysense.rb, line 88 def force_type(value, out_sensor) case out_sensor.type_of_value when "Integer" return value.to_i when "Float" return value.to_f when "String" return value.to_s else return value.to_i end end
get_out_sensor_by_index(index)
click to toggle source
# File lib/fluent/plugin/out_everysense.rb, line 136 def get_out_sensor_by_index(index) @out_sensors[@out_sensors.keys[index]] end
get_out_sensor_by_name(input_name)
click to toggle source
# File lib/fluent/plugin/out_everysense.rb, line 132 def get_out_sensor_by_name(input_name) @out_sensors[input_name] end
prefer_buffered_processing()
click to toggle source
# File lib/fluent/plugin/out_everysense.rb, line 84 def prefer_buffered_processing @has_buffer_section end
process(tag, es)
click to toggle source
# File lib/fluent/plugin/out_everysense.rb, line 225 def process(tag, es) put_event_stream(tag, es) end
put_event_stream(tag, es)
click to toggle source
Emitted record inside fluentd network [
{"farm_uuid": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX", "device": [ { "farm_uuid": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX", "sensor_name": "collection_data_1", "data_class_name": "AirTemperature", "data": { "at": "2016-05-12 21:38:52 UTC", "memo": null, "value": 23, "unit": "degree Celsius" } }, { "farm_uuid": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX", "sensor_name": "collection_data_2", "data_class_name": "AirHygrometer", "data": { "at": "2016-05-12 21:38:52 UTC", "memo": null, "value": 30, "unit": "%RH" } } ] }
]
# File lib/fluent/plugin/out_everysense.rb, line 217 def put_event_stream(tag, es) es = inject_values_to_event_stream(tag, es) es.each do |time, record| log.debug "#{tag}, #{record}" put_message(@formatter.format(tag, time, transform_in_device(record["device"]))) end end
shutdown()
click to toggle source
This method is called when shutting down. Shutdown the thread and close sockets or files here.
Calls superclass method
# File lib/fluent/plugin/out_everysense.rb, line 238 def shutdown shutdown_proxy super end
start()
click to toggle source
This method is called when starting. Open sockets or files here.
Calls superclass method
# File lib/fluent/plugin/out_everysense.rb, line 79 def start super start_proxy end
transform_in_device(in_device)
click to toggle source
Assumed input message format is as follows
[
[ { "farm_uuid": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX", "sensor_name": "collection_data_1", "data_class_name": "AirTemperature", "data": { "at": "2016-05-12 21:38:52 UTC", "memo": null, "value": 23, "unit": "degree Celsius" } }, { "farm_uuid": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX", "sensor_name": "collection_data_2", "data_class_name": "AirHygrometer", "data": { "at": "2016-05-12 21:38:52 UTC", "memo": null, "value": 30, "unit": "%RH" } } ]
]
# File lib/fluent/plugin/out_everysense.rb, line 168 def transform_in_device(in_device) if in_device[0]["sensor_name"].nil? # if first input data does not include sensor_name, # output_names are used in the specified order. return in_device.map.with_index do |in_sensor, i| if !get_out_sensor_by_index(i).nil? transform_in_sensor(in_sensor, get_out_sensor_by_index(i)) end end.compact else return in_device.map do |in_sensor| #log.debug in_sensor["sensor_name"] if @out_sensors.keys.include?(in_sensor["sensor_name"]) transform_in_sensor(in_sensor, get_out_sensor_by_name(in_sensor["sensor_name"])) end end.compact end end
transform_in_sensor(in_sensor, out_sensor)
click to toggle source
output message format of EverySense is as follows
[
{ "data": { "at":"2016-04-14 17:15:00 +0900", "unit":"degree Celsius", "value":23 }, "sensor_name":"FESTIVAL_Test1_Sensor" }, { "data": { "at":"2016-04-14 17:15:00 +0900", "unit":"%RH", "value":30 }, "sensor_name":"FESTIVAL_Test1_Sensor2" }
]
# File lib/fluent/plugin/out_everysense.rb, line 121 def transform_in_sensor(in_sensor, out_sensor) # modify sensor_name { data: { at: Time.parse(in_sensor["data"]["at"]), unit: in_sensor["data"]["unit"], value: force_type(in_sensor["data"]["value"], out_sensor) }, sensor_name: out_sensor.output_name } end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_everysense.rb, line 229 def write(chunk) return if chunk.empty? tag = chunk.metadata.tag put_event_stream(tag, es) end