topdir = "/tmp" rootdir = topdir .. "/data" if not silk.site.have_site_config() then if not silk.site.init_site(nil, rootdir, true) then error("The silk.conf file was not found") end end -- Format used for storing files local file_info = { -- assume all data is IPv4 record_format = silk.file_format_id("FT_RWGENERIC"), } -- Convenience variable for mapping flowtype values local ft_name_id_map = { inweb = silk.site.flowtype_id("iw"), in_nonweb = silk.site.flowtype_id("in"), innull = silk.site.flowtype_id("innull"), outweb = silk.site.flowtype_id("ow"), out_nonweb = silk.site.flowtype_id("out"), outnull = silk.site.flowtype_id("outnull"), ext2ext = silk.site.flowtype_id("ext2ext"), int2int = silk.site.flowtype_id("int2int"), other = silk.site.flowtype_id("other"), } -- A missing time field is set to the UNIX epoch local epoch = silk.datetime(0) -- -- keep track of schemas (templates) we have seen. be careful about -- -- caching schemas when reading UDP data since the schemas will never -- -- be garbage collected. -- local schema_cache = {} -- Given a probe definition, an rwRec, and the corresponding fixrec, -- write the rwRec to appropriate outputs. function pack_function (probe, fwd_rec, rev_rec, fixrec) -- -- uncomment these lines to print the schema -- local s = silk.fixrec_get_schema(fixrec) -- local tid = silk.schema_get_template_id(s) -- if not schema_cache[tid] then -- schema_cache[tid] = true -- io.stderr:write(string.format("%s [%d] :: ", s, tid)) -- for _,k in ipairs(s) do -- io.stderr:write(string.format("%s, ", k.name)) -- end -- io.stderr:write("\n") -- end -- determine the flowtype based on the IP addresses local saddr = fwd_rec.sip local daddr = fwd_rec.dip local flowtypes = {} if probe.external[saddr] then -- Came from an external address and... if probe.internal[daddr] then -- ...went to an internal address (incoming) if silk.rwRec_is_web(fwd_rec) then flowtypes = { ft_name_id_map["inweb"], ft_name_id_map["outweb"] } else flowtypes = { ft_name_id_map["in_nonweb"], ft_name_id_map["out_nonweb"] } end elseif probe.external[daddr] then -- ...went back to an external address (external to external) flowtypes = { ft_name_id_map["ext2ext"], ft_name_id_map["ext2ext"] } else -- went to an unexpected location flowtypes = { ft_name_id_map["other"], ft_name_id_map["other"] } end elseif probe.internal[saddr] then -- Came from an internal address and... if probe.external[daddr] then -- ...went to an external address (outgoing) if silk.rwRec_is_web(fwd_rec) then flowtypes = { ft_name_id_map["outweb"], ft_name_id_map["inweb"] } else flowtypes = { ft_name_id_map["out_nonweb"], ft_name_id_map["in_nonweb"] } end elseif probe.internal[saddr] then -- ...went to another internal address (internal to internal) flowtypes = { ft_name_id_map["int2int"], ft_name_id_map["int2int"] } else -- went to an unexpected location flowtypes = { ft_name_id_map["other"], ft_name_id_map["other"] } end else flowtypes = { ft_name_id_map["other"], ft_name_id_map["other"] } end -- handle firewall events local fw = silk.fixrec_get_value(fixrec, "firewallEvent", silk.fixrec_get_value(fixrec, "NF_F_FW_EVENT", 0)) if fw == 3 then -- this is a flow denied event; store this as "null" unless the -- type is already set to "other" if flowtypes[1] ~= ft_name_id_map["other"] then if probe.external[saddr] then flowtypes = { ft_name_id_map["innull"], ft_name_id_map["innull"] } elseif probe.internal[saddr] then flowtypes = { ft_name_id_map["outnull"], ft_name_id_map["outnull"] } end end elseif fw ~= 2 then -- 2 is a flow deleted (flow closed / flow ended) event; store -- these and ignore everything else return end -- adjust the packet and byte counts if necessary if fwd_rec.packets == 0 then fwd_rec.packets = 1 end if fwd_rec.bytes == 0 then fwd_rec.bytes = 1 end -- adjust the time if necessary if fwd_rec.stime == epoch then fwd_rec.stime = fixrec.observationTimeMilliseconds end -- there is never an end time, so use the start time fwd_rec.etime = fwd_rec.stime -- Set flowtype and sensor fwd_rec.classtype_id = flowtypes[1] fwd_rec.sensor_id = probe.repo_key.sensor probe.repo_key.flowtype = fwd_rec.classtype_id probe.repo_key.timestamp = fwd_rec.stime -- Write record write_rwrec(fwd_rec, file_info) -- If responderOctets is non-zero, then rev_rec is non-nil if rev_rec then -- Set flowtype and sensor on the reverse record. rev_rec.classtype_id = flowtypes[2] rev_rec.sensor_id = probe.repo_key.sensor if rev_rec.packets == 0 then rev_rec.packets = 1 end if rev_rec.stime == epoch then rev_rec.stime = fixrec.observationTimeMilliseconds end rev_rec.etime = rev_rec.stime -- Update the probe_key with the flowtype and the starting hour probe.repo_key.flowtype = rev_rec.classtype_id probe.repo_key.timestamp = rev_rec.stime -- Write the reverse record write_rwrec(rev_rec, file_info) end end -- -- Configuration variables -- input = { mode = "stream", probes = { P0 = { name = "P0", type = "netflow-v9", source = { listen = "127.0.0.1:44556", protocol = "udp", }, packing_function = pack_function, vars = { internal = silk.ipwildcard("192.168.x.x"), external = silk.ipwildcard("10.0.0.0/8"), null = silk.ipwildcard("172.16.0.0/13"), repo_key = { sensor = silk.site.sensor_id("S0"), timestamp = 0, flowtype = 0, }, }, }, }, } output = { mode = "local-storage", flush_interval = 10, processing = { directory = topdir .. "/processing", error_directory = topdir .. "/error", }, root_directory = rootdir, } log = { directory = topdir .. "/log", level = "debug", } daemon = { fork = true, }