Icinga Notifications: Custom Channel Plugins

by | Oct 8, 2024

As many of you have already seen in our previous blog posts and our early beta release, we’re working on a new, independent notification module.

Right now, we only offer three ready-made channels for sending notifications. Today, I want to show you how you can create your own channel and add it to the Icinga Notifications module.

In this blog post, I’ll show you how to build a bridge to Telegram and send new notifications to a group chat. The program logic is written with Python 3.11, but you can use any scripting or programming language that can read the standard input and use the standard output during its execution.

Full program code: TelegramBridge.py

#!/usr/bin/env python3
from __future__ import annotations

import json
import os
import re
import sys
from json import JSONDecodeError
from types import SimpleNamespace

import requests


class Request:
    def __init__(self, id: int, method: str, params: dict = None) -> None:
        self.id = id
        self.method = method
        self.params = params


class Response:
    def __init__(self, id: int, error: str = None, result: object = None) -> None:
        self.id = id
        self.error = error
        self.result = result

    def __str__(self) -> str:
        try:
            return json.dumps(
                Response.__remove_empty_properties(self.__dict__.copy()),
                default=lambda o: o.__dict__,
                sort_keys=True,
                indent=None
            )
        except BaseException as _:
            return self.__repr__()

    @staticmethod
    def __remove_empty_properties(dictionary) -> dict:
        for k, v in list(dictionary.items()):
            if v is None:
                del dictionary[k]
            elif isinstance(v, dict):
                Response.__remove_empty_properties(v)
        return dictionary


class TelegramBridge:
    NOTIFICATION_TEMPLATE: str = '''
<b>{severity} | {title} | #{id}</b>
<a href="{url}">🔗 {url_title}</a>
<pre>{text}</pre>
'''
    # as taken from
    # https://github.com/Icinga/icinga-notifications/blob/114c1be12dbe6ff8d3b9280fb67e02812100d101/internal/event/severity.go#L25-L33
    SEVERITY_ICON: dict[str, str] = {
        'ok': '🟢 OK',
        'debug': '⚪ DEBUG',
        'info': '🔵 INFO',
        'notice': '⚫ NOTICE',
        'warning': '🟠 WARNING',
        'err': '🔴 ERROR',
        'crit': '🔴 CRITICAL',
        'alert': '🟣 ALERT',
        'emerg': '🟣 EMERGENCY'
    }

    def __init__(self: TelegramBridge) -> None:
        self.token = None
        self.chat_id = None
        self.base_url = None
        self.log("starting")

    @staticmethod
    def log(message: str) -> None:
        sys.stderr.write("[telegramBridge] {message}{linesep}".format(message=message, linesep=os.linesep))
        sys.stdout.flush()

    @staticmethod
    def return_and_continue(response: Response) -> None:
        if response.error:
            # removing result in case of an error
            response.result = None
            TelegramBridge.log("returning an error: {}".format(response.error))

        TelegramBridge.log("returning response: {response}{linesep}".format(response=response, linesep=os.linesep))
        sys.stdout.write("{response}{linesep}".format(response=response, linesep=os.linesep))
        sys.stdout.flush()

    @staticmethod
    def return_and_exit(response: Response) -> None:
        TelegramBridge.return_and_continue(response)

        if response.error:
            exit(1)
        else:
            exit(0)

    def send_message(self, text: str) -> None:
        url = self.base_url + 'sendMessage'
        data = {
            'chat_id': self.chat_id,
            'text': text,
            'parse_mode': 'HTML',
            'disable_web_page_preview': True
        }
        requests.post(url, json=data)

    def run(self) -> None:
        self.log("waiting for requests")
        for line in sys.stdin:
            parsed_id = None
            raw = None

            # parse request identifier
            id_match = re.match(r'.*"id":(\d+)', line)
            if id_match:
                parsed_id = id_match.group(1)

            # parse request
            try:
                raw = json.loads(line.strip())
            except JSONDecodeError as err:
                self.return_and_exit(Response(id=parsed_id, error="failed decoding input: {}".format(err.msg)))

            # map request
            if raw:
                request = Request(**raw)
                self.log("received request: {}".format(vars(request)))
                self.handle_request(request)
            else:
                self.log("dropped input as it couldn't be mapped to a request")

        self.log("exiting")

    def handle_request(self, request: Request) -> None:
        match request.method:
            case 'GetInfo':
                # return information to Icinga Notifications about this channel
                info = SimpleNamespace()
                info.name = 'TelegramBridge'
                info.version = '0.1.0'
                info.author = 'Icinga GmbH'
                info.config_attrs = [
                    {
                        "name": "bot_token",
                        "type": "secret",
                        "label": {
                            "de_DE": "Telegram Token",
                            "en_US": "Telegram Token"
                        },
                        "help": {
                            "de_DE": "Authentifizierungstoken von Telegram.",
                            "en_US": "Authentication token from Telegram."
                        },
                        "required": True,
                        "min": None,
                        "max": None
                    },
                    {
                        "name": "chat_identifier",
                        "type": "number",
                        "label": {
                            "de_DE": "Chat Identifikator",
                            "en_US": "Chat Identifier"
                        },
                        "help": {
                            "de_DE": "Chat Identifikator, welcher den gewünschten Telegram Chat referenziert, "
                                     "in welchem die Benachrichtigungen landen sollen.",
                            "en_US": "Chat identifier that references the desired Telegram chat, which should receive "
                                     "the notifications."
                        },
                        "required": True,
                        "min": None,
                        "max": -1
                    }
                ]

                self.return_and_exit(Response(id=request.id, result=info))
            case 'SetConfig':
                missing_fields = []
                invalid_fields = []

                if not request.params:
                    self.return_and_exit(Response(id=request.id, error="invalid config (missing parameters)"))
                conf = request.params

                # check for required fields
                if not 'bot_token' in conf:
                    missing_fields.append('bot_token')
                if not 'chat_identifier' in conf:
                    missing_fields.append('chat_identifier')

                if len(missing_fields) > 0:
                    # return an error as at least one required field seems missing
                    self.return_and_exit(Response(id=request.id, error="invalid config (missing required field(s) '{}'"
                                                                       ")".format(', '.join(missing_fields))))
                # validate fields
                if len(conf['bot_token']) < 44 or len(conf['bot_token']) > 46:
                    invalid_fields.append('bot_token')
                if not conf['chat_identifier'][0:1] == '-':
                    invalid_fields.append('channel_identifier')
                else:
                    try:
                        test = int(conf['chat_identifier'])
                    except ValueError as _:
                        invalid_fields.append('channel_identifier')

                if len(invalid_fields) > 0:
                    # return an error as at least one field seems invalid
                    self.return_and_exit(Response(id=request.id, error="invalid config (invalid field(s) '{}'"
                                                                       ")".format(', '.join(invalid_fields))))

                # set configuration to current instance
                self.token = conf['bot_token']
                self.chat_id = conf['chat_identifier']
                self.base_url = 'https://api.telegram.org/bot{token}/'.format(token=self.token)

                # all good
                self.return_and_continue(Response(id=request.id))
            case 'SendNotification':
                # check for required payload
                if not request.params:
                    self.return_and_exit(Response(id=request.id, error="invalid notification (missing parameters)"))
                notification = request.params

                # check if notification payload contains a linked object
                if not 'object' in notification:
                    self.return_and_exit(Response(id=request.id, error="notification is missing its linked object."))

                # send notification to linked telegram chat
                title = ''
                if 'service' in notification['object']['tags']:
                    title = "{service} on {host}".format(
                        service=notification['object']['tags']['service'],
                        host=notification['object']['tags']['host']
                    )
                else:
                    title = notification['object']['tags']['host']
                self.send_message(text=TelegramBridge.NOTIFICATION_TEMPLATE.format(
                    severity=TelegramBridge.SEVERITY_ICON[notification['incident']['severity']],
                    title=title,
                    id=notification['incident']['id'],
                    url=notification['incident']['url'],
                    url_title="View Incident".format(notification['incident']['id']),
                    text=notification['event']['message']
                ))

                self.return_and_continue(Response(id=request.id))
            case _:
                self.log("request method unknown: {}".format(request.method))
                self.return_and_exit(Response(id=request.id, error="invalid request (method unknown)"))


# run entrypoint
if __name__ == '__main__':
    app = TelegramBridge()
    app.run()

 

Understanding the communication

First, let’s take a quick look at the module. If you take a look at the documentation, you’ll see that the module looks for executable files in the installation path when it’s started. On Unix-based systems, this path is located by default under /usr/libexec/icinga-notifications/channels or /usr/lib/icinga-notifications/channels.
You can store script or binary files here. The module scans this directory when the application is started and calls the executable files.

The module and the custom program communicate via JSON objects, derived from the JSON-RPC standard. Your program gets requests via program input (standard input) and responds to the module via its output (standard output).

The Icinga Notifications module opens a new process for each file found and first sends a request to get the channel information (GetInfo JSON object). Our program should respond with a JSON object containing that information. The required fields are in the docs under GetInfo and type Info.

It’s important to get the configuration parameters right, as these control the input fields for configuring the channel via the web interface.
In my example, we need a so-called bot token for communication with Telegram. This is needed for authentication via Telegram’s REST API, and our program needs to use it during runtime. I also store the chat identifier, which defines the chat to which our program should send the message when using Telegram’s REST API. This can be an individual or a group chat.

In this example, we send the notifications to a group chat.

Here’s what our data should look like (in JSON format):

from types import SimpleNamespace

...

# return information to Icinga Notifications about this channel
info = SimpleNamespace()
info.name = 'TelegramBridge'
info.version = '0.1.0'
info.author = 'Icinga GmbH'
info.config_attrs = [
    {
        "name": "bot_token",
        "type": "secret",
        "label": {
            "de_DE": "Telegram Token",
            "en_US": "Telegram Token"
        },
        "help": {
            "de_DE": "Authentifizierungstoken von Telegram.",
            "en_US": "Authentication token from Telegram."
        },
        "required": True,
        "min": None,
        "max": None
    },
    {
        "name": "chat_identifier",
        "type": "number",
        "label": {
            "de_DE": "Chat Identifikator",
            "en_US": "Chat Identifier"
        },
        "help": {
            "de_DE": "Chat Identifikator, welcher den gewünschten Telegram Chat referenziert, "
                     "in welchem die Benachrichtigungen landen sollen.",
            "en_US": "Chat identifier that references the desired Telegram chat, which should receive "
                     "the notifications."
        },
        "required": True,
        "min": None,
        "max": -1
    }
]

...

If we follow the instructions in the documentation and return a proper JSON object, the module should include our channel in the available channel list and display it in the right places in the web interface when it’s started.

So far, so good. But what happens now when our input fields are configured in the web interface?

Once the channel configuration is stored in the module’s database, the module sends another request to our program when the process is started as well as after receiving the channel information.
This time the module sends a request to configure the current program instance (SetConfig JSON object).
Since this happens every time the Icinga Notifications module gets started, we don’t have to save the configuration separately in a separate database or configuration file.
Instead, we get the stored configuration data fresh from the module itself each time (check the documentation).

 

Configuring our channel

Our program code has now told the module which fields are needed for the configuration and what they should look like, but we haven’t made any configuration entries in the system yet.

To do this, we just log in to our Icinga instance via the web interface and create a new channel via the Icinga Notifications module settings (settings → notifications → channel).

Screenshot: Icinga Telegram Channel Configuration

Once this step is done, the module should send us a request. We need to make sure the transmitted data is valid. In this example, there’s no strong validation, but at least we can check the length of the token and make sure the chat identifier starts with a minus sign. That’s because group chat identifiers in Telegram are in the negative range, while individual chat identifiers are always in the positive range.

If everything looks good, we save the token and chat ID in our instance variables and reply to the module with a simple JSON object that includes the ID we received in the request. The module can use these IDs to link the request and response together, so it can process multiple requests at the same time.

If something is missing from the configuration or has the wrong value, our program responds with an error in the JSON object.

...

missing_fields = []
invalid_fields = []

if not request.params:
    self.return_and_exit(Response(id=request.id, error="invalid config (missing parameters)"))
conf = request.params

# check for required fields
if not 'bot_token' in conf:
    missing_fields.append('bot_token')
if not 'chat_identifier' in conf:
    missing_fields.append('chat_identifier')

if len(missing_fields) > 0:
    # return an error as at least one required field seems missing
    self.return_and_exit(Response(id=request.id, error="invalid config (missing required field(s) '{}'"
                                                       ")".format(', '.join(missing_fields))))
# validate fields
if len(conf['bot_token']) < 44 or len(conf['bot_token']) > 46:
    invalid_fields.append('bot_token')
if not conf['chat_identifier'][0:1] == '-':
    invalid_fields.append('channel_identifier')
else:
    try:
        test = int(conf['chat_identifier'])
    except ValueError as _:
        invalid_fields.append('channel_identifier')

if len(invalid_fields) > 0:
    # return an error as at least one field seems invalid
    self.return_and_exit(Response(id=request.id, error="invalid config (invalid field(s) '{}'"
                                                       ")".format(', '.join(invalid_fields))))

# set configuration to current instance
self.token = conf['bot_token']
self.chat_id = conf['chat_identifier']
self.base_url = 'https://api.telegram.org/bot{token}/'.format(token=self.token)

# all good
self.return_and_continue(Response(id=request.id))

...

Our code can already handle two requests from the module, which is great.
However, we are still missing one crucial piece: being able to process notifications. There’s also some detailed documentation on this over at SendNotifications.

 

Adding a technical contact

In order to receive notifications, we need to create a new contact in Icinga Notifications’ web interface.

In this case, it is a pseudo contact, as there is no real person behind it, but a technical user should be referenced (we want to write in a group chat and not notify individual people).

Our Telegram channel should be selected as the default channel.

Screenshot: Icinga Telegram Contact

 

Setting up an event rule

Once we’ve set up the contact, we need to create an event rule so that the module will know when to notify us.
In the event rule, we define our technical contact and decide when we want to receive notifications to our channel. This is the case with these parameters if a notification has existed for more than three minutes and the severity of the underlying object does not match the status “OK.”
It’s also worth noting that we’ll get a notification if an object changes back to the “OK” status, as long as we’ve already been notified about the same object with a different status.

Screenshot: Icinga Telegram Event Rule

 

Now we can get notifications when they’re triggered in the system and our event rule matches.
The module now sends notifications to our program code as soon as they get triggered by our defined event rule.

The information that’s passed along includes the relevant parameters, such as:
– Contact (who we want the notification to go to. In our case, it’s always our technical user.)
– The linked object (in my example, an Icinga 2 host or service; this can vary depending on the source system that feeds the data)
– The incident opened with its identifier, URL, and severity (taken from the associated object).
– The event linked to the object, including its type, time, and output.

An example payload of such a notification looks like this:

{
    "id": 5,
    "method": "SendNotification",
    "params": {
        "contact": {
            "full_name": "Icinga Telegram",
            "addresses": null
        },
        "object": {
            "name": "dummy-host",
            "url": "http://localhost/icingaweb2/icingadb/host?name=dummy-host",
            "tags": {
                "host": "dummy-host"
            },
            "extra_tags": {
                "hostgroup/app-xrp": "",
                "hostgroup/department-ps": "",
                "hostgroup/env-acceptance": "",
                "hostgroup/location-rome": ""
            }
        },
        "incident": {
            "id": 2689,
            "url": "http://localhost/icingaweb2/notifications/incident?id=2689",
            "severity": "crit"
        },
        "event": {
            "time": "2024-01-01T08:45:15.291537351Z",
            "type": "state",
            "username": "",
            "message": "Dummy output..."
        }
    }
}

 

Notifying through Telegram

To finish things up and get everything into our Telegram chat, we just need to send the data through Telegram’s REST API. We can create a simple HTML template and populate the data for the notification payload.

I’ve added a little bit of code to make it easier to recognize the different severity levels. It precedes the respective states with an additional Unicode emoji (in the form of a colorful dot).

Our program code needs to either confirm the notifications with the request identifier or reject them with an error.

...

NOTIFICATION_TEMPLATE: str = '''
<b>{severity} | {title} | #{id}</b>
<a href="{url}">🔗 {url_title}</a>
<pre>{text}</pre>
'''

# as taken from
# https://github.com/Icinga/icinga-notifications/blob/114c1be12dbe6ff8d3b9280fb67e02812100d101/internal/event/severity.go#L25-L33
SEVERITY_ICON: dict[str, str] = {
    'ok': '🟢 OK',
    'debug': '⚪ DEBUG',
    'info': '🔵 INFO',
    'notice': '⚫ NOTICE',
    'warning': '🟠 WARNING',
    'err': '🔴 ERROR',
    'crit': '🔴 CRITICAL',
    'alert': '🟣 ALERT',
    'emerg': '🟣 EMERGENCY'
}

...

# check for required payload
if not request.params:
    self.return_and_exit(Response(id=request.id, error="invalid notification (missing parameters)"))
notification = request.params

# check if notification payload contains a linked object
if not 'object' in notification:
    self.return_and_exit(Response(id=request.id, error="notification is missing its linked object."))

# send notification to linked telegram chat
title = ''
if 'service' in notification['object']['tags']:
    title = "{service} on {host}".format(
        service=notification['object']['tags']['service'],
        host=notification['object']['tags']['host']
    )
else:
    title = notification['object']['tags']['host']
self.send_message(text=TelegramBridge.NOTIFICATION_TEMPLATE.format(
    severity=TelegramBridge.SEVERITY_ICON[notification['incident']['severity']],
    title=title,
    id=notification['incident']['id'],
    url=notification['incident']['url'].replace('localhost', 'example.com').replace('http', 'https'),
    url_title="View Incident".format(notification['incident']['id']),
    text=notification['event']['message']
))

self.return_and_continue(Response(id=request.id))

...

 

Conclusion

This is what the final result looks like in Telegram:Screenshot: Telegram Output

Finally, I would like to mention again that you are free to choose your programming or scripting language as long as you can implement the specifications regarding the JSON objects in the requests and responses.

In this example, I have also explicitly left out the error handling on the part of Telegram’s REST API, as it is not relevant to understand this blog post. The program code shown in this blog post should therefore not simply be used in production.
Instead, it should provide you a good overview of what to look out for when developing your own channel and how the program flow could be structured like.

And here’s a little bonus tip for developers: The routine that starts the process for your channel interprets any output in “stderr” as logging.
You can therefore write debug messages to the standard error and then read out the log from Icinga Notifications.

Happy coding! 😊

You May Also Like…

Subscribe to our Newsletter

A monthly digest of the latest Icinga news, releases, articles and community topics.