Creating Splunk Alerts (aka 'Saved Searches') from the command line

Splunk Alerts (also called saved searches) are a great way to have Splunk send you data on a scheduled basis, or when certain conditions are met (e.g. a metric crosses a threshold). While these alerts can easily be created in the web UI (by clicking “Save As/Alert” in a search) in many cases would be nice to do it programmatically. This makes it easy to set up alerts for many individual searches, while keeping everything under source control.

First of all, let’s specify our search. We will put each search in its own text file. As an example, here is a really simple search that counts the number of 404 errors by sourcetype:

index=* | stats count(eval(status="404")) AS count_status BY sourcetype

This is the same search definition as you would enter in the Splunk search app UI. It’s a good idea to test and refine the search interactively until you’re happy with it, then save it in a file called my_search.txt.

The Splunk API

Next we’re going to write some Python to drive the Splunk API. The following code assumes you have a valid auth token in the environment variable SPLUNK_AUTH_TOKEN. Here is a little helper script you can use to set this variable (assuming you have xmllint installed):

 #!/bin/sh
 #
 # login to splunk and set SPLUNK_AUTH_TOKEN
 #
 # Usage: eval $( ./splunk-login.sh ) 
 #
  
 SPLUNK_HOST="https://splunk.int.corp:8089" 
  
 read -p "Username: " USERNAME 
 read -s -p "Password: " PASSWORD 
 echo >&2 
  
 response=$( curl -s -d "username=${USERNAME}&password=${PASSWORD}" -k ${SPLUNK_HOST}/services/auth/login ) 
  
 SPLUNK_AUTH_TOKEN=$( echo $response | xmllint --nowarning --xpath '//response/sessionKey/text()' - 2>/dev/null ) 
 if [[ $? -eq 0 ]] ; then 
     echo "export SPLUNK_AUTH_TOKEN=${SPLUNK_AUTH_TOKEN}" 
 else 
     echo $response | xmllint --xpath '//response/messages/msg/text()' - >&2 
     echo >&2 
 fi 

You’ll also need to install the Splunk SDK for Python. This should be as simple typing pip install splunk-sdk as depending on how your environment is configured.

Ok, on to the code!

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
#!/usr/bin/env python

import os
from splunklib import client

splunk_host = os.getenv("SPLUNK_HOSTNAME", "splunk.int.corp")
splunk_token = os.getenv("SPLUNK_AUTH_TOKEN")
splunk_app = 'my_app'

service = client.Service(host = splunk_host, token = splunk_token, app = splunk_app)

Great, we now have a reference to the Splunk API service. So how do we use the SDK to create a saved search? The Splunk API documentation is slightly terrifying. Luckily, we don’t need to worry about the vast majority of the available parameters. Let’s create an alert that runs on a schedule and sends its results to a webhook:

12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
filename = 'my_search.txt'
search_name = os.path.splitext(os.path.basename(filename))[0]
search = open(filename, 'r').read()

params = {
    'actions': 'webhook',
    'action.webhook.param.url': 'http://splunk-webhook-service/',
    'alert_comparator': 'greater than',
    'alert_threshold': '0',
    'alert_type': 'number of events',
    'alert.digest_mode': '0',
    'alert.suppress': '0',
    'cron_schedule': '0 1 * * *',
    'dispatch.earliest_time': '-30d@d',
    'dispatch.latest_time': 'now',
    'display.general.type': 'statistics',
    'display.page.search.mode': 'fast',
    'display.page.search.tab': 'statistics',
    'is_scheduled': '1',
    'request.ui_dispatch_app': splunk_app,
    'request.ui_dispatch_view': 'search'
}

service.savedsearches.create(search_name, search, **params)

And that’s it! Easy.

There’s a few things in that dictionary of parameters to the API call that are worth calling attention to.

(*) Note that I have not yet tested this, so I’m not sure which other parameters are required.

Fully worked example

The above code is all you actually need, but here’s a slightly expanded example that accepts command line arguments and multiple search files. It also deletes any existing search of the same name (i.e. your new search replaces the old one), and randomises the crontab spec slightly to spread out load on the Splunk server.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#!/usr/bin/env python 
 
import os, argparse, random 
from splunklib import client 
 
splunk_host = os.getenv("SPLUNK_HOSTNAME", "splunk.int.corp") 
splunk_token = os.getenv("SPLUNK_AUTH_TOKEN") 
 
def create_alert(search_name, search, splunk_app, webhook_host): 
    crontab = "%s 1 * * *" % random.randint(1, 59) # pick a random minute during 01:01 - 01:59 
    params = { 
        'actions': 'webhook', 
        'action.webhook.param.url': webhook_host, 
        'alert_comparator': 'greater than', 
        'alert_threshold': '0', 
        'alert_type': 'number of events', 
        'alert.digest_mode': '0', 
        'alert.suppress': '0', 
        'cron_schedule': crontab, 
        'dispatch.earliest_time': '-30d@d', 
        'dispatch.latest_time': 'now', 
        'display.general.type': 'statistics', 
        'display.page.search.mode': 'fast', 
        'display.page.search.tab': 'statistics', 
        'is_scheduled': '1', 
        'request.ui_dispatch_app': splunk_app, 
        'request.ui_dispatch_view': 'search' 
    } 
 
    service = client.Service(host = splunk_host, token = splunk_token, app = splunk_app) 
    savedsearches = service.saved_searches 
    try: 
        savedsearches.delete(search_name) 
        print("Deleted old version of %s" % search_name) 
    except Exception: 
        pass 
 
    savedsearches.create(search_name, search, **params) 
    print("Created alert '%s' to be triggered at %s" % (search_name, crontab)) 
 
 
if __name__ == '__main__': 
    parser = argparse.ArgumentParser(description=""" 
        Create Splunk alerts. Assumes there is a valid auth token in SPLUNK_AUTH_TOKEN 
        (e.g. `eval $(./splunk-login.sh )`). Default Splunk API hostname (splunk.int.corp)  
        can be overridden by setting SPLUNK_HOSTNAME. 
    """) 
    parser.add_argument('files', metavar='file', type=str, nargs='+', help='a file that contains a search definition') 
    parser.add_argument('--webhook_url', nargs='?', dest='webhook_host',  
                        default='https://splunk-webook-service.int.corp/', 
                        help='the URL of the webhook the alert will be configured to POST to') 
    parser.add_argument('--splunk_app', nargs='?', dest='splunk_app', 
                        default='search',  
                        help='the name of the Splunk app that will own the search (defaults to "search")') 
 
    args = parser.parse_args() 
 
    for f in args.files: 
        name = os.path.splitext(os.path.basename(f))[0] 
        search = open(f, 'r').read() 
        create_alert(name, search, args.splunk_app, args.webhook_host) 

Conclusion

So despite the somewhat lacking official documentation, creating Splunk saved searches is actually pretty straightforward. Thanks to Alexander Leonov for this post that got me headed in the right direction: https://avleonov.com/2019/01/17/creating-splunk-alerts-using-api/