jon blog featured image

How to Filter Out Events at the Indexer by Date in Splunk

By: Jon Walthour | Splunk Consultant

 

A customer presented me with the following problem recently. He needed to be able to exclude events older than a specified time from being indexed in Splunk. His requirement was more precise than excluding events older than so many days ago. He was also dealing with streaming data coming in through an HTTP Event Collector (HEC). So, his data was not file-based, where an “ignoreOlderThan” setting in an inputs.conf file on a forwarder would solve his problem.

As I thought about his problem, I agreed with him—using “ignoreOlderThan” was not an option. Besides, this would work only based on the modification timestamp of a monitored file, not on the events themselves within that file. The solution to his problem needed to be more granular, more precise.

We needed a way to exclude events from being indexed into Splunk through whatever means they were arriving at the parsing layer (from a universal forwarder, via syslog or HEC) based on a precise definition of a time. This meant that it had to be more exact than a certain number of days ago (as in, for example, the “MAX_DAYS_AGO” setting in props.conf).

To meet his regulatory requirements for retention, my customer needed to be able to exclude, for example, events older than January 1 at midnight and do so with certainty.

As I set about finding (or creating) a solution, I found “INGEST_EVAL,” a setting in transforms.conf. This setting was introduced in version 7.2. It runs an eval expression at index-time on the parsing (indexing) tier in a similar (though not identical) way as a search-time eval expression works. The biggest difference with this new eval statement is that it is run in the indexing pipeline and any new fields created by it become indexed fields rather than search-time fields. These fields are stored in the rawdata journal of the index.

However, what if I could do an “if-then” type of statement in an eval that would change the value of a current field? What if I could evaluate the timestamp of the event, determine if it’s older than a given epoch date and change the queue the event was in from the indexing queue (“indexQueue”) to oblivion (“nullQueue”)?

I found some examples of this in Splunk’s documentation, but none of them worked for this specific use case. I also found that “INGEST_EVAL” is rather limited in what functions it can work with the eval statement. Functions like “relative_time()” and “now()” don’t work. I also found that, at the point in the ingestion pipeline where Splunk runs these INGEST_EVAL statements, fields like “_indextime” aren’t yet defined. This left me with using an older “time()” function. So, when you’re working with this feature in the future, be sure to test your eval expression carefully as not all functions have been fully evaluated in the documentation yet.

 

Here’s what I came up with:

props.conf

[<sourcetype>]

TRANSFORMS-oldevents=delete-older-than-January-1-2020-midnight-GMT

transforms.conf

[delete-older-than-January-1-2020-midnight-GMT]

INGEST_EVAL = queue=if(substr(tostring(1577836800-_time),1,1)=”-“, “indexQueue”, “nullQueue”)

 

The key is in the evaluation of the first character of the subtraction in the “queue=” calculation. A negative number yields a “-” for the first character; a positive number a digit. Generally, negative numbers are “younger than” your criteria and positive numbers are “older than” it. You keep the younger events by sending them to the indexQueue (by setting “queue” equal to “indexQueue”) and you chuck older events by sending them to the nullQueue (by setting “queue” equal to “nullQueue”).

Needless to say, my customer was pleased with the solution we provided. It addressed his use case “precisely.” I hope it is helpful for you, too. Happy Splunking!

Have a unique use case you would like our Splunk experts to help you with? Contact us today!