How to write a Kinto plugin?

Kinto plugins allow to add extra-features to Kinto. Most notably:

  • Respond to internal events (e.g. notify third-party)
  • Add endpoints for custom URLs (e.g. new hook URL)
  • Add custom endpoint renderers (e.g. XML instead of JSON)

Kinto plugins are Python modules loaded on startup.

In this tutorial, we will build a plugin for ElasticSearch, a full-text search engine. The plugin will:

  • Initialize an indexer on startup;
  • Index the records when they’re created, updated, or deleted.
  • Add a new /{collection}/search endpoint;

Plugins are built using the Pyramid ecosystem.

Run ElasticSearch

We will run a local install of ElasticSearch on localhost:9200.

Using Docker it is pretty straightforward:

sudo docker run -p 9200:9200 elasticsearch

It is also be installed manually using the official instructions.

Include me

First, create a Python package and install it locally. For example:

$ pip install cookiecutter
$ cookiecutter gh:kragniz/cookiecutter-pypackage-minimal

[...]

$ cd kinto_elasticsearch
$ python setup.py develop

In order to be included, a package must define an includeme(config) function.

For example, in kinto_elasticsearch/__init__.py:

def includeme(config):
    print("I am the ElasticSearch plugin!")

Add it the config.ini file:

kinto.includes = kinto_elasticsearch

Our message should now appear on kinto start.

Simple indexer

Let’s define a simple indexer class in kinto_elasticsearch/indexer.py. It can search and index records, using the official Python package:

$ pip install elasticsearch

It is a wrapper basically, and the code is kept simple for the simplicity of this tutorial:

import elasticsearch

class Indexer:
    def __init__(self, hosts):
        self.client = elasticsearch.Elasticsearch(hosts)

    def search(self, bucket_id, collection_id, query, **kwargs):
        indexname = '%s-%s' % (bucket_id, collection_id)
        return self.client.search(index=indexname,
                                  doc_type=indexname,
                                  body=query,
                                  **kwargs)

    def index_record(self, bucket_id, collection_id, record, id_field='id'):
        indexname = '%s-%s' % (bucket_id, collection_id)
        if not self.client.indices.exists(index=indexname):
            self.client.indices.create(index=indexname)

        record_id = record[id_field]
        index = self.client.index(index=indexname,
                                  doc_type=indexname,
                                  id=record_id,
                                  body=record,
                                  refresh=True)
        return index

    def unindex_record(self, bucket_id, collection_id, record, id_field='id'):
        indexname = '%s-%s' % (bucket_id, collection_id)
        record_id = record[id_field]
        result = self.client.delete(index=indexname,
                                    doc_type=indexname,
                                    id=record_id,
                                    refresh=True)
        return result

And a simple method to load from configuration:

from pyramid.settings import aslist

def load_from_config(config):
    settings = config.get_settings()
    hosts = aslist(settings.get('elasticsearch.hosts', 'localhost:9200'))
    indexer = Indexer(hosts=hosts)
    return indexer

Initialize on startup

We now need to initialize the indexer when Kinto starts. It happens in the includeme() function.

from . import indexer

def includeme(config):
    # Register a global indexer object
    config.registry.indexer = indexer.load_from_config(config)

Add a search view

Add an endpoint definition in kinto_elasticsearch/views.py:

import logging

from kinto.core import Service

logger = logging.getLogger(__name__)

search = Service(name="search",
                 path='/buckets/{bucket_id}/collections/{collection_id}/search',
                 description="Search")

@search.post()
def get_search(request):
    bucket_id = request.matchdict['bucket_id']
    collection_id = request.matchdict['collection_id']

    query = request.body

    # Access indexer from views using registry.
    indexer = request.registry.indexer
    try:
        results = indexer.search(bucket_id, collection_id, query)
    except Exception as e:
        logger.exception(e)
        results = {}
    return results

Enable the view:

from . import indexer

def includeme(config):
    # Register a global indexer object
    config.registry.indexer = indexer.load_from_config(config)

    # Activate end-points.
    config.scan('kinto_elasticsearch.views')

This new URL should now be accessible, but return no result:

$ http POST "http://localhost:8888/v1/buckets/example/collections/notes/search
HTTP/1.1 200 OK
Access-Control-Expose-Headers: Retry-After, Content-Length, Alert, Backoff
Content-Length: 2
Content-Type: application/json; charset=UTF-8
Date: Wed, 20 Jan 2016 12:01:50 GMT
Server: waitress

{}

Index records on change

When records change, we index them. When they are deleted, we unindex them.

Let’s define a function on_resource_changed() that will be called when an action is performed on records.

def on_resource_changed(event):
    indexer = event.request.registry.indexer

    resource_name = event.payload['resource_name']

    if resource_name != "record":
        return

    bucket_id = event.payload['bucket_id']
    collection_id = event.payload['collection_id']

    action = event.payload['action']
    for change in event.impacted_records:
        if action == 'delete':
            indexer.unindex_record(bucket_id,
                                   collection_id,
                                   record=change['old'])
        else:
            indexer.index_record(bucket_id,
                                 collection_id,
                                 record=change['new'])

And then we bind this function with the Kinto-Core events:

from kinto.core.events import ResourceChanged

from . import indexer

def includeme(config):
    # Register a global indexer object
    config.registry.indexer = indexer.load_from_config(config)

    # Activate end-points.
    config.scan('kinto_elasticsearch.views')

    # Plug the callback with resource events.
    config.add_subscriber(on_resource_changed, ResourceChanged)

Declare API capabilities

Arbitrary capabilities can be declared and exposed in the root URL.

Clients can rely on this to detect optional features on the server, like our indexer!

from kinto.core.events import ResourceChanged

from . import indexer

def includeme(config):
    # Register a global indexer object
    config.registry.indexer = indexer.load_from_config(config)

    # Activate end-points.
    config.scan('kinto_elasticsearch.views')

    # Plug the callback with resource events.
    config.add_subscriber(on_resource_changed, ResourceChanged)

    config.add_api_capability("indexed_search",
                              description="Search records using ElasticSearch",
                              url="https://my-super-indexer-for-kinto.org")

Note

Any argument passed to config.add_api_capability() will be exposed in the root URL.

Default configuration and environment variables

A helper allows to read configuration values from environment variables and provide default values.

from kinto.core import load_default_settings
from kinto.core.events import ResourceChanged

from . import indexer

DEFAULT_SETTINGS = {
    'elasticsearch.refresh_enabled': False
}

def includeme(config):
    # Load settings from environment and apply defaults.
    load_default_settings(config, DEFAULT_SETTINGS)

    # Register a global indexer object
    config.registry.indexer = indexer.load_from_config(config)

    # Activate end-points.
    config.scan('kinto_elasticsearch.views')

    # Plug the callback with resource events.
    config.add_subscriber(on_resource_changed, ResourceChanged)

    config.add_api_capability("indexed_search",
                              description="Search records using ElasticSearch",
                              url="https://my-super-indexer-for-kinto.org")

In this example, if the environment variable KINTO_ELASTICSEARCH_REFRESH_ENABLED is set to true, it will override the setting kinto.elasticsearch.refresh_enabled from the .ini file.

Test it altogether

We’re almost done! Now, let’s check if it works properly.

Create a bucket and collection:

$ http --auth alice:s3cr3t --verbose PUT http://localhost:8888/v1/buckets/example
$ http --auth alice:s3cr3t --verbose PUT http://localhost:8888/v1/buckets/example/collections/notes

Add a new record:

$ echo '{"data": {"note": "kinto"}}' | http --auth alice:s3cr3t --verbose POST http://localhost:8888/v1/buckets/example/collections/notes/records

It should now be possible to search for it:

$ http --auth alice:s3cr3t --verbose POST http://localhost:8888/v1/buckets/default/collections/assets/search
HTTP/1.1 200 OK
Access-Control-Expose-Headers: Retry-After, Content-Length, Alert, Backoff
Content-Length: 333
Content-Type: application/json; charset=UTF-8
Date: Wed, 20 Jan 2016 12:02:05 GMT
Server: waitress

{
    "_shards": {
        "failed": 0,
        "successful": 5,
        "total": 5
    },
    "hits": {
        "hits": [
            {
                "_id": "453ff779-e967-4b08-99b9-5c16af865a67",
                "_index": "example-assets",
                "_score": 1.0,
                "_source": {
                    "id": "453ff779-e967-4b08-99b9-5c16af865a67",
                    "last_modified": 1453291301729,
                    "note": "kinto"
                },
                "_type": "example-assets"
            }
        ],
        "max_score": 1.0,
        "total": 1
    },
    "timed_out": false,
    "took": 20
}

Going further

This plugins implements the basic functionnality. In order to make it a first-class plugin, it would require:

  • Check that user has read permission on the collection before searching
  • Create the index when the collection is created
  • Create a mapping if the collection has a JSON schema
  • Delete the index when the bucket or collection are deleted

If you feel like doing it, we would be very glad to help you!

More documentation

Some more details about Kinto internals are given in the Kinto core docs!