input { redis { batch_count => 1 #返回的事件数量,此属性仅在list模式下起做用。 data_type => "list" #logstash redis插件工做方式 key => "logstash-test-list" #监听的键值 host => "127.0.0.1" #redis地址 port => 6379 #redis端口号 password => "123qwe" #若是有安全认证,此项为密码 db => 0 #redis数据库的编号 threads => 1 #启用线程数量 } } output { stdout{} }
图不够专业,可是大体就如上图所示:java
首先是程序的自定义,这里设置了redis插件须要的参数,默认值,以及校验等。redis
而后注册Redis实例须要的信息,好比key的名字或者url等,能够看到默认的data_type是list模式。数据库
程序运行的主要入口,根据不一样的data_type,传递不一样的实现方法,而后调用listener_loop执行循环监听json
Listner_loop方法传递了两个参数,一个是监听器实现的方法,一个是处理的数据队列。循环是每秒钟执行一次,若是循环标识被设置,则退出。数组
上面的循环方法能够看到,是经过一个参数shutdown_requested来判断是否继续循环。该参数经过tear_down方法设置为true,而后根据不一样的模式,指定不一样的退出方式。
若是是list模式,则直接退出;若是是channel模式,则发送redis的unsubsribe命令退出;若是是pattern_channel,则发送punsubscribe退出。安全
在循环内部,判断是否已经建立了redis实例,若是没有建立,则调用connect方法建立;不然直接执行。服务器
这里前一段是调用Redis的new方法,初始化一个redis实例。紧接着判断batch_count是否大于1,若是等于1,就什么也不作,而后返回redis。
若是batch_count大于1,那么就调用load_batch_script方法,加载Lua脚本,存储到redis中的lua脚本字典中,供后面使用。代码以下:数据结构
上面的代码应该是这个插件最难理解的部分了。为了弄清楚这段代码的工做,须要了解下面几个知识点:less
首先,要想运行上面的脚本,必须是Redis2.6+的版本,才支持EVAL,不然会报错!EVAL命令与js中的差很少,就是能够把某一个字符串当作命令解析,其中字符串就包括lua脚本。这样有什么好处呢?ide
说白了,就是能一次性进行多个操做。好比咱们能够在脚本中写入一连串的操做,这些操做会以原子模式,一次性在服务器执行完,在返回回来。
关于lua脚本,其实没有详细研究的必要,可是必定要知道一个local和table的概念。local是建立本地的变量,这样就不会污染redis的数据。table是lua的一种数据结构,有点相似于json,能够存储数据。
另外还要知道EVAL命令的使用方法,看下面这个命令,就好理解了!EVAL "return KEYS[1] KEYS[2] ARGV[1] ARGV[2];" 2 name:xing age:13
就会返回:
name age xing 13
这段代码没有通过真正的操做,可是有助于理解就好!也就是说,EVAL后面跟着一段脚本,脚本后面跟着的就是参数,能够经过KEYS和ARGV数组得到,可是下标从1开始。
再来讲说EVAL命令,它的执行过程以下:
有了这些理论基础之后,就能够看看上面的代码都作了什么了!
首先是获取参数,这个参数赋值给i;而后建立了一个对象res;紧接着调用llen命令,得到指定list的长度;若是list的长度大于i,则什么也不作;若是小于i,那么i就等于lenth;而后执行命令lpop,取出list中的元素,一共取i次,放入res中,最后返回。
说得通俗点,就是比较一下list元素个数与设置batch_count的值。若是batch_count为5,列表list中有5条以上的数据,那么直接取5条,一次性返回;不然取length条返回。
能够看到这段脚本的做用,就是让logstash一次请求,最多得到batch_count条事件,减少了服务器处理请求的压力。
讲完这段代码,能够看看不一样的工做模式的实现代码了:
首先是list的代码,其实就是执行BLPOP命令,获取数据。若是在list模式中,还会去判断batch_count的值,若是是1直接退出;若是大于1,则使用evalsha命令调用以前保存的脚本方法。
至于channel和pattern_channel,就没啥解释的了,就是分别调用subscribe和psubsribe命令而已。
其实最难理解的,就是中间那段lua脚本~明白它的用处,redis插件也就不难理解了。
# encoding: utf-8 require "logstash/inputs/base" require "logstash/inputs/threadable" require "logstash/namespace" # This input will read events from a Redis instance; it supports both Redis channels and lists. # The list command (BLPOP) used by Logstash is supported in Redis v1.3.1+, and # the channel commands used by Logstash are found in Redis v1.3.8+. # While you may be able to make these Redis versions work, the best performance # and stability will be found in more recent stable versions. Versions 2.6.0+ # are recommended. # # For more information about Redis, see <http://redis.io/> # # `batch_count` note: If you use the `batch_count` setting, you *must* use a Redis version 2.6.0 or # newer. Anything older does not support the operations used by batching. # class LogStash::Inputs::Redis < LogStash::Inputs::Threadable config_name "redis" default :codec, "json" # The `name` configuration is used for logging in case there are multiple instances. # This feature has no real function and will be removed in future versions. config :name, :validate => :string, :default => "default", :deprecated => true # The hostname of your Redis server. config :host, :validate => :string, :default => "127.0.0.1" # The port to connect on. config :port, :validate => :number, :default => 6379 # The Redis database number. config :db, :validate => :number, :default => 0 # Initial connection timeout in seconds. config :timeout, :validate => :number, :default => 5 # Password to authenticate with. There is no authentication by default. config :password, :validate => :password # The name of the Redis queue (we'll use BLPOP against this). # TODO: remove soon. config :queue, :validate => :string, :deprecated => true # The name of a Redis list or channel. # TODO: change required to true config :key, :validate => :string, :required => false # Specify either list or channel. If `redis\_type` is `list`, then we will BLPOP the # key. If `redis\_type` is `channel`, then we will SUBSCRIBE to the key. # If `redis\_type` is `pattern_channel`, then we will PSUBSCRIBE to the key. # TODO: change required to true config :data_type, :validate => [ "list", "channel", "pattern_channel" ], :required => false # The number of events to return from Redis using EVAL. config :batch_count, :validate => :number, :default => 1 public def register require 'redis' @redis = nil @redis_url = "redis://#{@password}@#{@host}:#{@port}/#{@db}" # TODO remove after setting key and data_type to true if @queue if @key or @data_type raise RuntimeError.new( "Cannot specify queue parameter and key or data_type" ) end @key = @queue @data_type = 'list' end if not @key or not @data_type raise RuntimeError.new( "Must define queue, or key and data_type parameters" ) end # end TODO @logger.info("Registering Redis", :identity => identity) end # def register # A string used to identify a Redis instance in log messages # TODO(sissel): Use instance variables for this once the @name config # option is removed. private def identity @name || "#{@redis_url} #{@data_type}:#{@key}" end private def connect redis = Redis.new( :host => @host, :port => @port, :timeout => @timeout, :db => @db, :password => @password.nil? ? nil : @password.value ) load_batch_script(redis) if @data_type == 'list' && (@batch_count > 1) return redis end # def connect private def load_batch_script(redis) #A Redis Lua EVAL script to fetch a count of keys #in case count is bigger than current items in queue whole queue will be returned without extra nil values redis_script = <<EOF local i = tonumber(ARGV[1]) local res = {} local length = redis.call('llen',KEYS[1]) if length < i then i = length end while (i > 0) do local item = redis.call("lpop", KEYS[1]) if (not item) then break end table.insert(res, item) i = i-1 end return res EOF @redis_script_sha = redis.script(:load, redis_script) end private def queue_event(msg, output_queue) begin @codec.decode(msg) do |event| decorate(event) output_queue << event end rescue LogStash::ShutdownSignal => e # propagate up raise(e) rescue => e # parse or event creation error @logger.error("Failed to create event", :message => msg, :exception => e, :backtrace => e.backtrace); end end private def list_listener(redis, output_queue) item = redis.blpop(@key, 0, :timeout => 1) return unless item # from timeout or other conditions # blpop returns the 'key' read from as well as the item result # we only care about the result (2nd item in the list). queue_event(item[1], output_queue) # If @batch_count is 1, there's no need to continue. return if @batch_count == 1 begin redis.evalsha(@redis_script_sha, [@key], [@batch_count-1]).each do |item| queue_event(item, output_queue) end # Below is a commented-out implementation of 'batch fetch' # using pipelined LPOP calls. This in practice has been observed to # perform exactly the same in terms of event throughput as # the evalsha method. Given that the EVALSHA implementation uses # one call to Redis instead of N (where N == @batch_count) calls, # I decided to go with the 'evalsha' method of fetching N items # from Redis in bulk. #redis.pipelined do #error, item = redis.lpop(@key) #(@batch_count-1).times { redis.lpop(@key) } #end.each do |item| #queue_event(item, output_queue) if item #end # --- End commented out implementation of 'batch fetch' rescue Redis::CommandError => e if e.to_s =~ /NOSCRIPT/ then @logger.warn("Redis may have been restarted, reloading Redis batch EVAL script", :exception => e); load_batch_script(redis) retry else raise e end end end private def channel_listener(redis, output_queue) redis.subscribe @key do |on| on.subscribe do |channel, count| @logger.info("Subscribed", :channel => channel, :count => count) end on.message do |channel, message| queue_event message, output_queue end on.unsubscribe do |channel, count| @logger.info("Unsubscribed", :channel => channel, :count => count) end end end private def pattern_channel_listener(redis, output_queue) redis.psubscribe @key do |on| on.psubscribe do |channel, count| @logger.info("Subscribed", :channel => channel, :count => count) end on.pmessage do |ch, event, message| queue_event message, output_queue end on.punsubscribe do |channel, count| @logger.info("Unsubscribed", :channel => channel, :count => count) end end end # Since both listeners have the same basic loop, we've abstracted the outer # loop. private def listener_loop(listener, output_queue) while !@shutdown_requested begin @redis ||= connect self.send listener, @redis, output_queue rescue Redis::BaseError => e @logger.warn("Redis connection problem", :exception => e) # Reset the redis variable to trigger reconnect @redis = nil sleep 1 end end end # listener_loop public def run(output_queue) if @data_type == 'list' listener_loop :list_listener, output_queue elsif @data_type == 'channel' listener_loop :channel_listener, output_queue else listener_loop :pattern_channel_listener, output_queue end rescue LogStash::ShutdownSignal # ignore and quit end # def run public def teardown @shutdown_requested = true if @redis if @data_type == 'list' @redis.quit rescue nil elsif @data_type == 'channel' @redis.unsubscribe rescue nil @redis.connection.disconnect elsif @data_type == 'pattern_channel' @redis.punsubscribe rescue nil @redis.connection.disconnect end @redis = nil end end end # class LogStash::Inputs::Redis