loader
Loading...
I am Nick Lang
Sat 05 Mar 2022

Nemesis

Managing resources in Elasticsearch in a single cluster is pretty straightforward and trivial to do. You log into Kibana, you navigate the app and you create the resources you want. Watches, Roles, Transforms, Indexes, index settings, etc.. All these things are Elasticsearch resources.

When you have a small team and you have a single cluster managing these resources through the Kibana UI is usually the easiest way to do it. But what happens when you start to have multiple environments, such as QA, Staging and Production. You want to ensure that the resources you put in each environment are the same. You want to ensure that if a resource accidentally gets changed, that you have a way to revert that change back to its pristine state? What if you want to ensure that you can test that your resources are operating the way you expect them to. What if you have a Source of Truth (SoT) that lives in another service provider (or multiple) and you need to aggregate those values in order to create an Elasticsearch resource?

Nemesis is a Python Library and CLI tool for managing Elasticsearch resources as code. What this means is that each Elasticsearch resource that Nemesis supports is an actual Python object with type hinting and reusable subobjects. Think of Nemesis like an SDK for Elasticsearch resources.

This makes Nemesis very useful because it’s composable, it’s transportable, it’s testable and it’s just Python. What does “just python” mean? Nemesis isn’t a DSL. Nemesis isn’t a templating tool. It means you can just write Python to create, manipulate, test and ship your resources.

Getting started

First off we should install nemesis:

$ pip install nemesis

After installing nemesis let's create our first nemesis project.

$ mkdir my_first_project

$ cd my_first_project

$ nemesis new

Now the first thing you will see in this directory is the __nemesis__.py file.

This is where we put all our code. This is the driver for the nemesis cli.

In the __nemesis__.py file the first thing you must do is instantiate the Nemesis object with the variable n.

import os
from nemesis import Nemesis

username, password = os.environ.get('CLOUD_AUTH').split(":")

n = Nemesis(cloud_id=os.environ.get('CLOUD_ID'),  username=username, password=password)

Once our nemesis client has been instantiated it’s now time to create our resources

from nemesis.resources.elasticsearch import index 

indexsettings = index.IndexSettings(
        index={
            "routing": {
                "allocation": {"include": {"_tier_preference": "data_content"}}
            },
            "number_of_shards": "1",
            "number_of_replicas": "2",
        }
)

index = index.Index(name="test-index", settings=indexsettings)

And finally the last step is to register the Resource with the nemesis client.

n.register(index)

Now using the nemesis CLI we can run a command like:

$ nemesis preview

To get a preview of what nemesis will do when trying to deploy this resource into elasticsearch. Basically this will diff the local object we just defined against the resource in Elasticsearch, if it exists. If it doesn’t exist, then we’d just be creating a new resource in Elasticsearch.

❯ nemesis preview
         Preview resources to be deployed          
┏━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━━━━━┓
┃ Resource ┃ Name       ┃ Action ┃           Diff ┃
┡━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━━━━━┩
│ Index    │ test-index │ create │ + ['settings'] │
└──────────┴────────────┴────────┴────────────────┘

Resources:
        Creating: 1

And then finally to deploy our resource into Elasticsearch we’d just need to run the command:

$ nemesis launch

Which would run a preview for you, and then ask for confirmation if you really want to deploy this object into Elasticsearch.

Composability

Nemesis breaks down each Elasticsearch resource into sub-resources, this allows for your nemesis projects to create small composable objects which can be re-used in the creation of your Elasticsearch resources.

Before we go any further let’s see an example. Creating ingest pipelines using Nemesis requires a few module imports and then you construct your pipeline as if it were a regular Python object.

from nemesis.resources.elasticsearch import ingest_pipeline

pipeline = ingest_pipeline.IngestPipeline(
        id="test-pipeline",
        processors=[
            {"pipeline": {"name": "pipelineA"}},
            {"set": {"field": "outer_pipeline_set", "value": "outer"}},
        ],
)

In this simple example we’ve created an Ingest Pipeline we are calling test-pipeline. We then added 2 processors to this pipeline.

  1. pipeline - execute another pipeline called pipelineA
  2. set - sets the field outer_pipeline_set to the value outer

This simple example illustrates how defining our pipeline can be done easily using Python. This example doesn’t really express the power of the composability of the sub-resources.

Let’s take a look at a more complex example creating an Elasticsearch Watcher:

First let’s import our resources and define all the sub-resources of the Watch we want to create.

from nemesis.resources.elasticsearch import watcher 
from nemesis.resources.elasticsearch.query import QueryDSL

trigger = watcher.Trigger(schedule={"cron": "0 0/1 * * * ?"})
query = QueryDSL(bool={
    "must": {"match": {"response": 404}},
    "filter": {
        "range": {
            "@timestamp": {
                "from": "{{ctx.trigger.scheduled_time}}||-5m",
                "to": "{{ctx.trigger.triggered_time}}",
             }
         }
    },
})

search_body = watcher.Body(query=query)

search_request = watcher.SearchRequest(
        indices=["logstash*",],
        body=search_body,
)

input_search = watcher.Search(request=search_request)
input = watcher.Input(search=input_search)

condition = watcher.Condition(compare={"ctx.payload.hits.total": {"gt": 0}})

email_admin_action = {
            "email_admin": {
                "email": {
                    "profile": "standard",
                    "to": [
                        "admin@domain.host.com",
                    ],
                    "subject": "404 recently encountered",
                }
            }
        }

Now we can construct our Watch using all the subresources we just created.

watch1 = watcher.Watch(
        watch_id="test-watch",
        trigger=trigger,
        input=input,
        condition=condition,
        actions=email_admin_action,
)

Now to demonstrate the composability of nemesis let's assume we want to create a 2nd watch, which uses a different index to query from, but everything else is the same.

search_request2 = watcher.SearchRequest(
        indices=["filebeat-*",],
        body=search_body,
)

# then we create a new `Search` object with the new SearchRequest.
input_search2 = watcher.Search(request=search_request2)

# then we define a new Input
input2 = watcher.Input(search=input_search2)

# finally we can define our new Watch, we re-uses all the same components
# from the previous watch.

watch2 = watcher.Watch(
        watch_id="test-watch",
        trigger=trigger,
        input=input2,
        condition=condition,
        actions=email_admin_action,
)

The nice thing about these resources, like the QueryDSL resource, is that they can be used in other Elasticsearch resources as well.

To further demonstrate our composability, let’s assume we want to make a Transform whose Source index uses the exact same query as our watches.

from nemesis.resources.elasticsearch import transform 

t = transform.Transform(
        id="test-transform",
        description="Maximum priced ecommerce data by customer_id in Asia",
        retention_policy=transform.RetentionPolicy(
            time=transform.RetentionPolicyTime(field="order_date", max_age="30d")
        ),
        dest=transform.Dest(index=dest_index.id, pipeline=pipeline.id),
        pivot=transform.Pivot(
            group_by={"customer_id": {"terms": {"field": "customer_id"}}},
            aggregations={"max_price": {"max": {"field": "taxful_total_price"}}},
        ),
        sync=transform.Sync(
            time=transform.SyncTime(field="order_date", delay="60s"),
        ),
        frequency="5m",
        source=transform.Source(
            index=[
                "source_index_name",
            ],
            query=query,                      # from previous example 
        ),
)

This is a long example, but as you can see at the end of this example we define the query field of the Source object to be the query we defined in our previous example.

Transportability

Nemesis is configured to interact with Elasticsearch using the Python Elasticsearch Client, this means we can instantiate our Nemesis client using either Cloud ID or Elasticsearch Host URL. Once our client is instantiated we can deploy and pull objects from Elasticsearch.

By taking advantage of the Environment Variables on a CI system we can specify multiple jobs of a CI system to point to different Elasticsearch Environments.

Let's say we want to deploy your Elasticsearch Index to 3 different environments, Production, Staging and QA.

We can just alter the Environment Variables for the instantiation of our Nemesis Client.

import os
USERNAME, PASSWORD = os.environ.get("CLOUD_AUTH").split(":")
CLOUD_ID = os.environ.get("CLOUD_ID")

n = nemesis(USERNAME, PASSWORD, cloud_id=CLOUD_ID)

Now when we execute our preview or launch commands we just need to adjust the env when we execute the CLI.

$ CLOUD_AUTH="staging:password" CLOUD_ID="staging:cloud_id" nemesis preview
$ CLOUD_AUTH="qa:password" CLOUD_ID="qa:cloud_id" nemesis preview
$ CLOUD_AUTH="production:password" CLOUD_ID="production:cloud_id" nemesis preview

Pre/Post Deploy Hooks

Nemesis supports the ability to add pre and post deploy hooks to your Elasticsearch resources.

Why would you want to do this? Well one great example is Ingest Pipelines have a simulate api. Which lets you test that your pipeline actually works the way you expect it too.

If you’re developing an Ingest Pipeline you’re most likely doing it through the DevConsole, and running the simulate API on a regular basis, till you fine tune it and get it working just the way you want. While this is do-able, you should really be writing your ingest pipelines with a way to validate that they are operating correct. Tested, if you will.

To do this you can write your Pipeline in Nemesis and then add a pre-deploy hook which will run any arbitrary function you define and give to the pre-deploy hook parameter.

Let’s see an example, first define a test function. This function requires 2 parameters:

  1. client
  2. A name for a resource, in this case we are calling ingest_pipeline
def simulate_pipeline(client, ingest_pipeline): 
    # First define some docs to run through our pipeline. 
    docs = [ 
        {"_source": {"source_index": "azure_20210701-20210731_08-05.11"}}, 
        {"_source": {"source_index": "azure_20210701-20210731_08-05.11"}}, 
        {"_source": {"source_index": "aws_20210701-20210731_08-05.11"}}, 
    ] 

    # Next let's run the simulate method on the ingest pipeline. 
    ret = ingest_pipeline.simulate(client, docs) 

    # now we can start to inspect the results 
    results = [r["processor_results"] for r in ret["docs"]] 
    try: 
        for result in results: 
            assert len(result) == len(docs), f"Pipeline result had `{len(result)}` processor results. There should be {len(docs)} results." 

            for processor_result in result: 
                assert (processor_result["status"] == "success"), f"Pipeline processor `{processor_result['processor_type']}` failed simulate" 
    except AssertionError as e: 
          # print the reason why it failed 
            print(e) 
          # return False to indicate the test failed. 
            return False 
    print("Simulate pipeline passed") 
    return True

Next we wanna create our resource and register it with the pre-deploy hook.

ingest_pipeline = IngestPipeline(
    id="ingest_pipeline_name",
    processors=[
        {
            "split": {
                "field": "source_index",
                "separator": "_",
                "preserve_trailing": True,
            }
        },
        {"script": {"source": "ctx['cloud_provider'] = ctx['source_index'][0]"}},
        {"remove": {"field": "source_index"}},
    ],
    version=1,
)

n.register(ingest_pipeline, pre_deploy=simulate_pipeline)

This example covers creating an ingest pipeline, creating a function to test the results of the ingest pipeline and finally registering the resources with Nemesis with a pre_deploy hook.

How does this test work? Well we call the simulate api first and we save the results of that API call to a value called ret. Then nested in that json blob is a field called results. The values in this results list are where the values we want to test are.

Finally our test goes through and first tests for each result in results we have the result list being equal to the docs list. Why is this important? Well we wanna make sure that each document went through the pipeline. If these numbers don’t match up then the pipeline didn’t process every document. Next we look at the processor_result of each result. This is the result of each processor defined in our ingest pipeline. So for each document, and for each processor in the pipeline we want to check for a status of success.

Nemesis is just Python

How is this useful? Imagine a scenario where a source of truth might exist in a database. Let’s say for example you are ingesting data that includes some sort of AccountID, but it’s not very useful to just see the AccountID in your Elasticsearch index, you also wanna see the account name.

So you can create a Logstash pipeline that maps all the account ID’s to the account names. This is really simple. But what happens when a new account is created? Someone will have to go and update that Logstash pipeline to account for the new account ID and name. Now this happens either frequently or not frequently, but it always requires a person to make a change to the Logstash pipeline and it requires a person to deploy that change to Logstash.

To solve this problem with Python it would be easier to query a database where both the account name and the account id are stored. Then you can dynamically create the Logstash pipeline to add that mapping, update the logstash pipeline in Elasticsearch using the Logstash API, and then allow Logstash to download the updated pipeline as soon as it’s available.

# arbitrary function to pull account ids and names
# returns a list of dicts
#  [
#      {"id":123,"name":"someaccountname"},
#  ]
from utils import list_accounts 

pipeline_string = """
input {
 pipeline {
    address => "account-mapping"
  }
}
filter {
  translate {
    field => "AccountId"
    destination => "AccountName"
    dictionary => {__ACCOUNT_MAP__}
  }
}
"""

# Create a list of strings that can be plopped into a logstash config
# to represent a `dictionary`

account_list = [f"\"{a['id']}\" => \"{a['name']}\"" for a in list_accounts()]
# String replace the `__ACCOUNT_MAP__` string with the list of strings
# we created in the step above

data = pipeline_string.replace("__ACCOUNT_MAP__", "\n".join(account_list))

pipeline = LogstashPipeline(
    id="accounts-pipeline",
    pipeline=data,
    last_modified=datetime.datetime(2021, 12, 9),
    pipeline_metadata={},
    pipeline_settings=pipeline_settings,
    username="elastic",
)

n.register(pipeline)

Now in order to keep our logstash pipeline up to date the next step is to automate the way this nemesis project deploys itself. The simplest way to do it would be to add it as a cron job. To deploy this nemesis project every day at 1 am you could add the following to your crontab.

0 1 * * * cd /path/to/directory && nemesis launch -y

But this only works if your computer is on 24/7. You’re better off trying to add this to an automated deployment. A job in your CI system of choice that runs every day.

Conclusion

That’s a fairly complete overview of Nemesis. I feel like we’ve just scratched the surface but it outlines a lot of the usefulness of nemesis.

Managing resources in Elasticsearch in a single cluster is pretty straightforward and trivial to do. You log into Kibana, you navigate the app and you create the resources you want. Watches, Roles, Transforms, Indexes, index settings, etc.. All these things are Elasticsearch resources.

When you have a small team and you have a single cluster managing these resources through the Kibana UI is usually the easiest way to do it. But what happens when you start to have multiple environments, such as QA, Staging and Production. You want to ensure that the resources you put in each environment are the same. You want to ensure that if a resource accidentally gets changed, that you have a way to revert that change back to its pristine state? What if you want to ensure that you can test that your resources are operating the way you expect them to. What if you have a Source of Truth (SoT) that lives in another service provider (or multiple) and you need to aggregate those values in order to create an Elasticsearch resource?

Nemesis is a Python Library and CLI tool for managing Elasticsearch resources as code. What this means is that each Elasticsearch resource that Nemesis supports is an actual Python object with type hinting and reusable subobjects. Think of Nemesis like an SDK for Elasticsearch resources.

This makes Nemesis very useful because it’s composable, it’s transportable, it’s testable and it’s just Python. What does “just python” mean? Nemesis isn’t a DSL. Nemesis isn’t a templating tool. It means you can just write Python to create, manipulate, test and ship your resources.

Getting started

First off we should install nemesis:

$ pip install nemesis

After installing nemesis let's create our first nemesis project.

$ mkdir my_first_project

$ cd my_first_project

$ nemesis new

Now the first thing you will see in this directory is the __nemesis__.py file.

This is where we put all our code. This is the driver for the nemesis cli.

In the __nemesis__.py file the first thing you must do is instantiate the Nemesis object with the variable n.

import os
from nemesis import Nemesis

username, password = os.environ.get('CLOUD_AUTH').split(":")

n = Nemesis(cloud_id=os.environ.get('CLOUD_ID'),  username=username, password=password)

Once our nemesis client has been instantiated it’s now time to create our resources

from nemesis.resources.elasticsearch import index 

indexsettings = index.IndexSettings(
        index={
            "routing": {
                "allocation": {"include": {"_tier_preference": "data_content"}}
            },
            "number_of_shards": "1",
            "number_of_replicas": "2",
        }
)

index = index.Index(name="test-index", settings=indexsettings)

And finally the last step is to register the Resource with the nemesis client.

n.register(index)

Now using the nemesis CLI we can run a command like:

$ nemesis preview

To get a preview of what nemesis will do when trying to deploy this resource into elasticsearch. Basically this will diff the local object we just defined against the resource in Elasticsearch, if it exists. If it doesn’t exist, then we’d just be creating a new resource in Elasticsearch.

❯ nemesis preview
         Preview resources to be deployed          
┏━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━━━━━┓
┃ Resource ┃ Name       ┃ Action ┃           Diff ┃
┡━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━━━━━┩
│ Index    │ test-index │ create │ + ['settings'] │
└──────────┴────────────┴────────┴────────────────┘

Resources:
        Creating: 1

And then finally to deploy our resource into Elasticsearch we’d just need to run the command:

$ nemesis launch

Which would run a preview for you, and then ask for confirmation if you really want to deploy this object into Elasticsearch.

Composability

Nemesis breaks down each Elasticsearch resource into sub-resources, this allows for your nemesis projects to create small composable objects which can be re-used in the creation of your Elasticsearch resources.

Before we go any further let’s see an example. Creating ingest pipelines using Nemesis requires a few module imports and then you construct your pipeline as if it were a regular Python object.

from nemesis.resources.elasticsearch import ingest_pipeline

pipeline = ingest_pipeline.IngestPipeline(
        id="test-pipeline",
        processors=[
            {"pipeline": {"name": "pipelineA"}},
            {"set": {"field": "outer_pipeline_set", "value": "outer"}},
        ],
)

In this simple example we’ve created an Ingest Pipeline we are calling test-pipeline. We then added 2 processors to this pipeline.

  1. pipeline - execute another pipeline called pipelineA
  2. set - sets the field outer_pipeline_set to the value outer

This simple example illustrates how defining our pipeline can be done easily using Python. This example doesn’t really express the power of the composability of the sub-resources.

Let’s take a look at a more complex example creating an Elasticsearch Watcher:

First let’s import our resources and define all the sub-resources of the Watch we want to create.

from nemesis.resources.elasticsearch import watcher 
from nemesis.resources.elasticsearch.query import QueryDSL

trigger = watcher.Trigger(schedule={"cron": "0 0/1 * * * ?"})
query = QueryDSL(bool={
    "must": {"match": {"response": 404}},
    "filter": {
        "range": {
            "@timestamp": {
                "from": "{{ctx.trigger.scheduled_time}}||-5m",
                "to": "{{ctx.trigger.triggered_time}}",
             }
         }
    },
})

search_body = watcher.Body(query=query)

search_request = watcher.SearchRequest(
        indices=["logstash*",],
        body=search_body,
)

input_search = watcher.Search(request=search_request)
input = watcher.Input(search=input_search)

condition = watcher.Condition(compare={"ctx.payload.hits.total": {"gt": 0}})

email_admin_action = {
            "email_admin": {
                "email": {
                    "profile": "standard",
                    "to": [
                        "admin@domain.host.com",
                    ],
                    "subject": "404 recently encountered",
                }
            }
        }

Now we can construct our Watch using all the subresources we just created.

watch1 = watcher.Watch(
        watch_id="test-watch",
        trigger=trigger,
        input=input,
        condition=condition,
        actions=email_admin_action,
)

Now to demonstrate the composability of nemesis let's assume we want to create a 2nd watch, which uses a different index to query from, but everything else is the same.

search_request2 = watcher.SearchRequest(
        indices=["filebeat-*",],
        body=search_body,
)

# then we create a new `Search` object with the new SearchRequest.
input_search2 = watcher.Search(request=search_request2)

# then we define a new Input
input2 = watcher.Input(search=input_search2)

# finally we can define our new Watch, we re-uses all the same components
# from the previous watch.

watch2 = watcher.Watch(
        watch_id="test-watch",
        trigger=trigger,
        input=input2,
        condition=condition,
        actions=email_admin_action,
)

The nice thing about these resources, like the QueryDSL resource, is that they can be used in other Elasticsearch resources as well.

To further demonstrate our composability, let’s assume we want to make a Transform whose Source index uses the exact same query as our watches.

from nemesis.resources.elasticsearch import transform 

t = transform.Transform(
        id="test-transform",
        description="Maximum priced ecommerce data by customer_id in Asia",
        retention_policy=transform.RetentionPolicy(
            time=transform.RetentionPolicyTime(field="order_date", max_age="30d")
        ),
        dest=transform.Dest(index=dest_index.id, pipeline=pipeline.id),
        pivot=transform.Pivot(
            group_by={"customer_id": {"terms": {"field": "customer_id"}}},
            aggregations={"max_price": {"max": {"field": "taxful_total_price"}}},
        ),
        sync=transform.Sync(
            time=transform.SyncTime(field="order_date", delay="60s"),
        ),
        frequency="5m",
        source=transform.Source(
            index=[
                "source_index_name",
            ],
            query=query,                      # from previous example 
        ),
)

This is a long example, but as you can see at the end of this example we define the query field of the Source object to be the query we defined in our previous example.

Transportability

Nemesis is configured to interact with Elasticsearch using the Python Elasticsearch Client, this means we can instantiate our Nemesis client using either Cloud ID or Elasticsearch Host URL. Once our client is instantiated we can deploy and pull objects from Elasticsearch.

By taking advantage of the Environment Variables on a CI system we can specify multiple jobs of a CI system to point to different Elasticsearch Environments.

Let's say we want to deploy your Elasticsearch Index to 3 different environments, Production, Staging and QA.

We can just alter the Environment Variables for the instantiation of our Nemesis Client.

import os
USERNAME, PASSWORD = os.environ.get("CLOUD_AUTH").split(":")
CLOUD_ID = os.environ.get("CLOUD_ID")

n = nemesis(USERNAME, PASSWORD, cloud_id=CLOUD_ID)

Now when we execute our preview or launch commands we just need to adjust the env when we execute the CLI.

$ CLOUD_AUTH="staging:password" CLOUD_ID="staging:cloud_id" nemesis preview
$ CLOUD_AUTH="qa:password" CLOUD_ID="qa:cloud_id" nemesis preview
$ CLOUD_AUTH="production:password" CLOUD_ID="production:cloud_id" nemesis preview

Pre/Post Deploy Hooks

Nemesis supports the ability to add pre and post deploy hooks to your Elasticsearch resources.

Why would you want to do this? Well one great example is Ingest Pipelines have a simulate api. Which lets you test that your pipeline actually works the way you expect it too.

If you’re developing an Ingest Pipeline you’re most likely doing it through the DevConsole, and running the simulate API on a regular basis, till you fine tune it and get it working just the way you want. While this is do-able, you should really be writing your ingest pipelines with a way to validate that they are operating correct. Tested, if you will.

To do this you can write your Pipeline in Nemesis and then add a pre-deploy hook which will run any arbitrary function you define and give to the pre-deploy hook parameter.

Let’s see an example, first define a test function. This function requires 2 parameters:

  1. client
  2. A name for a resource, in this case we are calling ingest_pipeline
def simulate_pipeline(client, ingest_pipeline): 
    # First define some docs to run through our pipeline. 
    docs = [ 
        {"_source": {"source_index": "azure_20210701-20210731_08-05.11"}}, 
        {"_source": {"source_index": "azure_20210701-20210731_08-05.11"}}, 
        {"_source": {"source_index": "aws_20210701-20210731_08-05.11"}}, 
    ] 

    # Next let's run the simulate method on the ingest pipeline. 
    ret = ingest_pipeline.simulate(client, docs) 

    # now we can start to inspect the results 
    results = [r["processor_results"] for r in ret["docs"]] 
    try: 
        for result in results: 
            assert len(result) == len(docs), f"Pipeline result had `{len(result)}` processor results. There should be {len(docs)} results." 

            for processor_result in result: 
                assert (processor_result["status"] == "success"), f"Pipeline processor `{processor_result['processor_type']}` failed simulate" 
    except AssertionError as e: 
          # print the reason why it failed 
            print(e) 
          # return False to indicate the test failed. 
            return False 
    print("Simulate pipeline passed") 
    return True

Next we wanna create our resource and register it with the pre-deploy hook.

ingest_pipeline = IngestPipeline(
    id="ingest_pipeline_name",
    processors=[
        {
            "split": {
                "field": "source_index",
                "separator": "_",
                "preserve_trailing": True,
            }
        },
        {"script": {"source": "ctx['cloud_provider'] = ctx['source_index'][0]"}},
        {"remove": {"field": "source_index"}},
    ],
    version=1,
)

n.register(ingest_pipeline, pre_deploy=simulate_pipeline)

This example covers creating an ingest pipeline, creating a function to test the results of the ingest pipeline and finally registering the resources with Nemesis with a pre_deploy hook.

How does this test work? Well we call the simulate api first and we save the results of that API call to a value called ret. Then nested in that json blob is a field called results. The values in this results list are where the values we want to test are.

Finally our test goes through and first tests for each result in results we have the result list being equal to the docs list. Why is this important? Well we wanna make sure that each document went through the pipeline. If these numbers don’t match up then the pipeline didn’t process every document. Next we look at the processor_result of each result. This is the result of each processor defined in our ingest pipeline. So for each document, and for each processor in the pipeline we want to check for a status of success.

Nemesis is just Python

How is this useful? Imagine a scenario where a source of truth might exist in a database. Let’s say for example you are ingesting data that includes some sort of AccountID, but it’s not very useful to just see the AccountID in your Elasticsearch index, you also wanna see the account name.

So you can create a Logstash pipeline that maps all the account ID’s to the account names. This is really simple. But what happens when a new account is created? Someone will have to go and update that Logstash pipeline to account for the new account ID and name. Now this happens either frequently or not frequently, but it always requires a person to make a change to the Logstash pipeline and it requires a person to deploy that change to Logstash.

To solve this problem with Python it would be easier to query a database where both the account name and the account id are stored. Then you can dynamically create the Logstash pipeline to add that mapping, update the logstash pipeline in Elasticsearch using the Logstash API, and then allow Logstash to download the updated pipeline as soon as it’s available.

# arbitrary function to pull account ids and names
# returns a list of dicts
#  [
#      {"id":123,"name":"someaccountname"},
#  ]
from utils import list_accounts 

pipeline_string = """
input {
 pipeline {
    address => "account-mapping"
  }
}
filter {
  translate {
    field => "AccountId"
    destination => "AccountName"
    dictionary => {__ACCOUNT_MAP__}
  }
}
"""

# Create a list of strings that can be plopped into a logstash config
# to represent a `dictionary`

account_list = [f"\"{a['id']}\" => \"{a['name']}\"" for a in list_accounts()]
# String replace the `__ACCOUNT_MAP__` string with the list of strings
# we created in the step above

data = pipeline_string.replace("__ACCOUNT_MAP__", "\n".join(account_list))

pipeline = LogstashPipeline(
    id="accounts-pipeline",
    pipeline=data,
    last_modified=datetime.datetime(2021, 12, 9),
    pipeline_metadata={},
    pipeline_settings=pipeline_settings,
    username="elastic",
)

n.register(pipeline)

Now in order to keep our logstash pipeline up to date the next step is to automate the way this nemesis project deploys itself. The simplest way to do it would be to add it as a cron job. To deploy this nemesis project every day at 1 am you could add the following to your crontab.

0 1 * * * cd /path/to/directory && nemesis launch -y

But this only works if your computer is on 24/7. You’re better off trying to add this to an automated deployment. A job in your CI system of choice that runs every day.

Conclusion

That’s a fairly complete overview of Nemesis. I feel like we’ve just scratched the surface but it outlines a lot of the usefulness of nemesis.