Build your own Beat

Beats is the platform for building lightweight, open source data shippers that send all kinds of data to Elasticsearch for being later analyzed. We have Packetbeat for monitoring the network traffic exchanged between your servers, Filebeat for getting the logs from your servers and the newly released Metricbeat that periodically fetches metrics from external systems. If you need to collect other custom data, you can easily build your own Beat based on the libbeat framework.  There are already 25+ Community Beats made by the community.

We provide the Beat Generator package that helps you create your own Beat. In this blog post, you will see how to create your own Beat by using the Beat Generator. The Beat that we create today for practice is lsbeat. lsbeat indexes informations of files and directories, similar with the Unix command ls. This article is based on Unix, so if you are Windows or other OS user, follow the instructions which fits with your OS.

Step 1 - Setup your Golang Environment

Beats are written in Golang. To create and develop a Beat, Golang must be installed on your machine. Follow the guide here to install Golang. Currently Beats require at least Golang 1.6. Make sure you properly setup your $GOPATH variable.

Let's see the code that we will use for Lsbeat. This is a simple Golang program that receives a directory as a command line argument and lists all files and subdirectories under this directory.

package main
import (
    "fmt"
    "io/ioutil"
    "os"
)
func main() {
    //apply run path "." without argument.
    if len(os.Args) == 1 {
        listDir(".")
    } else {
        listDir(os.Args[1])
    }
}
func listDir(dirFile string) {
    files, _ := ioutil.ReadDir(dirFile)
    for _, f := range files {
        t := f.ModTime()
        fmt.Println(f.Name(), dirFile+"/"+f.Name(), f.IsDir(), t, f.Size())
        if f.IsDir() {
            listDir(dirFile + "/" + f.Name())
        }
    }
}

We will reuse the code of the listDir function.

Step 2 - Generate

To generate our own beat we use the Beat Generator. First you must install cookiecutter. Check out the installation guide here. After having installed cookiecutter, we must decide on a name for the Beat. The name must be one word all lowercase. In this example we are using lsbeat.

To create the Beat skeleton, you should get the Beats generator package, available in the beats repository. Once you installed Golang, you can download the Beats generator package using go get command. Once you run the command, all source files will be downloaded under the $GOPATH/src path.

$ go get github.com/elastic/beats

To work on a stable branch, check out the specific branch.

$ cd $GOPATH/src/github.com/elastic/beats
$ git checkout 5.1

Now create and move to your own repository under GOPATH, and run cookiecutter with the Beat Generator path.

$ cd $GOPATH/src/github.com/{user}
$ cookiecutter $GOPATH/src/github.com/elastic/beats/generate/beat

Cookiecutter will ask you several questions. For your project_name enter lsbeat, for github_name - your github id. The next two questions with beat and beat_path should already be automatically set correctly. For the last one your can insert your Firstname Lastname.

project_name [Examplebeat]: lsbeat
github_name [your-github-name]: {username}
beat [lsbeat]:
beat_path [github.com/{github id}]:
full_name [Firstname Lastname]: {Full Name}

This should now have created a directory lsbeat inside our folder with several files. Let’s change to this directory and list up files automatically created.

$ cd lsbeat
$ tree
.
├── CONTRIBUTING.md
├── LICENSE
├── Makefile
├── README.md
├── beater
│   └── lsbeat.go
├── config
│   ├── config.go
│   └── config_test.go
├── dev-tools
│   └── packer
│       ├── Makefile
│       ├── beats
│       │   └── lsbeat.yml
│       └── version.yml
├── docs
│   └── index.asciidoc
├── etc
│   ├── beat.yml
│   └── fields.yml
├── glide.yaml
├── lsbeat.template.json
├── main.go
├── main_test.go
└── tests
    └── system
        ├── config
        │   └── lsbeat.yml.j2
        ├── lsbeat.py
        ├── requirements.txt
        └── test_base.py

We now have a raw template of the Beat but still need to fetch the dependencies and setup the git repository.

First you need to fetch the dependencies, which in our case is only libbeat, and create the basic config and template files. We will have a closer look at the template and config files later.

$ make setup

After you have your own Beat, start sharing with the community by uploading it to a GitHub repository.

Screen Shot 2016-07-13 at 10.53.58 AM.png

To push lsbeat in the GitHub repository, run the following commands:

$ git remote add origin [email protected]:{username}/lsbeat.git
$ git push -u origin master

Now we have a complete Beat and pushed the first version to Github. Let’s build and run our Beat and then dig deeper into the code.

Step 3 - Configure

Once you run the commands above, it will create the lsbeat.yml and lsbeat.template.json files automatically. All basic configurations are already written in these files.

lsbeat.yml:

lsbeat:
  # Defines how often an event is sent to the output
  period: 1s

period is a parameter that is included by the generator in all Beats. It represents that Lsbeat iterates the process every 1 second. Let's change this period from 1 to 10 sec and add new path parameter which represents the path of top directory program will scan. We can add these parameters in beat.yml under the etc/ directory. 

lsbeat:
  # Defines how often an event is sent to the output
  period: 10s
  path: "."

Once we added new parameters, we run make update command to apply changes to the lsbeat.yml configuration file. We can see that the new parameters we set in etc/beat.yml are available now in lsbeat.yml.

$ make update
$ cat lsbeat.yml
################### Lsbeat Configuration Example #########################
############################# Lsbeat ######################################
lsbeat:
  # Defines how often an event is sent to the output
  period: 10s
  path: "."
###############################################################################

After updating the configuration files, you should edit config/config.go, so you can add the path parameter.

package config
import "time"
type Config struct {
    Period time.Duration `config:"period"`
    Path   string        `config:"path"`
}
var DefaultConfig = Config{
    Period: 10 * time.Second,
    Path:   ".",
}

Let’s use the default configuration options to 10 sec for period and current directory (.) for the default directory.

Step 4 - Add code

Each Beat needs to implement the Beater interface, by defining the Run() and Stop() functions. A more detailed guide about the Beater interface is available here.

To do that, you just need to define a structure named Lsbeat which defines the Lsbeat object that should implement the Beater interface. Let’s add lastIndexTime which we will use for saving the last timestamp data.

type Lsbeat struct {
    done   chan struct{}
    config config.Config
    client publisher.Client
    lastIndexTime time.Time
}

In addition, each Beat needs to implement the New() function that receives the Beat configuration and returns the Beat object of type Lsbeat.

func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
    config := config.DefaultConfig
    if err := cfg.Unpack(&config); err != nil {
        return nil, fmt.Errorf("Error reading config file: %v", err)
    }
    ls := &Lsbeat{
        done:   make(chan struct{}),
        config: config,
    }
    return ls, nil
}

In the case of Lsbeat, we want to extend the default Run() function to export information about the files and subdirectories available in a directory.

Before we modify the Run() function, lets add listDir() function on the bottom of lsbeat.go file first, which collect files and directories informations.  It generates events that include:

  • "@timestamp": common.Time(time.Now())
  • "type": beatname
  • "modtime": common.Time(t)
  • "filename": f.Name()
  • "path": dirFile + "/" + f.Name()
  • "directory": f.IsDir()
  • "filesize": f.Size()

It will index all files and directories for the first time, but after first routine it will check if file or directory is created or modified after first routine, to index only newer files and directories. Timestamp of last routine will be saved in lasIndexTime variable.

func (bt *Lsbeat) listDir(dirFile string, beatname string) {
    files, _ := ioutil.ReadDir(dirFile)
    for _, f := range files {
        t := f.ModTime()
        path := filepath.Join(dirFile, f.Name())
        if t.After(bt.lastIndexTime) {
            event := common.MapStr{
                "@timestamp": common.Time(time.Now()),
                "type":       beatname,
                "modtime":    common.Time(t),
                "filename":   f.Name(),
                "path":       path,
                "directory":  f.IsDir(),
                "filesize":   f.Size(),
            }
            bt.client.PublishEvent(event)
        }
        if f.IsDir() {
            bt.listDir(path, beatname)
        }
    }
}

And don't forget to add io/ioutil package at import libraries.

import (
    "fmt"
    "io/ioutil"
    "time"
)

Now, let’s see the Run() function that calls listDir() function and saves timestamp in lasIndexTime variable.

func (bt *Lsbeat) Run(b *beat.Beat) error {
    logp.Info("lsbeat is running! Hit CTRL-C to stop it.")
    bt.client = b.Publisher.Connect()
    ticker := time.NewTicker(bt.config.Period)
    for {
        now := time.Now()
        bt.listDir(bt.config.Path, b.Name) // call listDir
        bt.lastIndexTime = now             // mark Timestamp
        logp.Info("Event sent")
        select {
        case <-bt.done:
            return nil
        case <-ticker.C:
        }
    }
}

The Stop() function is supposed to break the run loop and it is the same with the generated one:

func (bt *Lsbeat) Stop() {
    bt.client.Close()
    close(bt.done)
}

We are almost done with coding. We have to add new fields on mapping. Add the fields information in the etc/fields.yml file.

- key: lsbeat
  title: LS Beat
  description: 
  fields:
    - name: counter
      type: integer
      required: true
      description: >
        PLEASE UPDATE DOCUMENTATION
    #new fiels added lsbeat
    - name: modtime
      type: date
    - name: filename
      type: text
    - name: path
    - name: directory
      type: boolean
    - name: filesize
      type: long

And apply new updates.

$ make update

filename field will be analyzed with nGram tokenizer. Let's add an custom analyzer on lsbeat.template.json file, under "settings".

{
  "mappings": {
        ...
  },
  "order": 0,
  "settings": {
    "index.refresh_interval": "5s",
    "analysis": {
      "analyzer": {
        "ls_ngram_analyzer": {
          "tokenizer": "ls_ngram_tokenizer"
        }
      },
      "tokenizer": {
        "ls_ngram_tokenizer": {
          "type": "ngram",
          "min_gram": "2",
          "token_chars": [
            "letter",
            "digit"
          ]
        }
      }
    }
  },
  "template": "lsbeat-*"
}

Step 5 - Build and run

Now we can build and run. Just run the make command and it will compile the code and will build the lsbeat (lsbeat.exe on windows) runnable binary file.

$ make

Modify lsbeat.yml file to set root directory for listing the files. In our case, we will set $GOPATH which is /Users/ec2-user/go. Make sure you put full path of directory.

lsbeat:
  # Defines how often an event is sent to the output
  period: 10s
  path: "/Users/ec2-user/go"

And also make sure your Elasticsearch and Kibana is running. Let's run Lsbeat and see what is happening.

$ ./lsbeat

You can use _cat api to check if index is created and datas are indexed properly.cat.png

we can see lsbeat-2016.06.03 index and can see count of documents. Let's query Lsbeat with filename field, which analyzed with nGram tokenizer. Queried with lsbe keyword. 

query.png

It works! Congratulation, you just built your own beat.