grove.processors package

Provides processors for collected log entries.

class grove.processors.BaseProcessor(config: Dict[str, Any])[source]

Bases: ABC

Provides an abstract base processor which all processors must inherit from.

class Configuration(*, name: str, processor: str)[source]

Bases: ProcessorConfig

Defines the required configuration and validators for the processor.

finalize()[source]

Performs a final set of operations after logs have been saved.

process(entry: Dict[str, Any]) List[Dict[str, Any]][source]

Performs a set of processes against a log entry.

Parameters:

entry – A collected log entry.

Returns:

The processed log entry in a list. If only a single entry is required the list should contain a single element. If the log entry is to be dropped, an empty list should be used.

Submodules

grove.processors.extract_paths module

Grove processor to extract and map fields using JMESPaths.

This processor is intended to be used to transform raw log entries into a common schema. This is especially useful for ensuring that all collected log entries from differing upstream vendors are in a consistent format - whether industry standard, or bespoke.

class grove.processors.extract_paths.Handler(config: Dict[str, Any])[source]

Bases: BaseProcessor

Extract and map fields using JMESPaths.

class Configuration(*, name: str, processor: str, raw: str | None = None, fields: List[Mapping])[source]

Bases: ProcessorConfig

Expresses the configuration and associated validators for the processor.

fields: List[Mapping]
raw: str | None
process(entry: Dict[str, Any]) List[Dict[str, Any]][source]

Attempt to extract and map fields from the log entry.

Parameters:

entry – A collected log entry.

Returns:

The processed log entry with fields mapped, as a list.

class grove.processors.extract_paths.Mapping(*, destination: str, sources: List[str] = [], static: str | None = None)[source]

Bases: BaseModel

Expresses the configuration fields used to specify path mapping.

destination: str
sources: List[str]
static: str | None
classmethod static_or_sources(value, values)[source]

Ensures that either sources or static is set, not both.

grove.processors.filter_entries module

Grove processor to filter (delete) entire log entries based JMESPath queries.

This processor is intended to allow dropping of log entries when a set of criteria is met. This may be used to assist in reducing outputting noisy log entries where a given vendor does not provide a mechanism for filtering events.

class grove.processors.filter_entries.Handler(config: Dict[str, Any])[source]

Bases: BaseProcessor

Filter (delete) log entries based on JMESPath queries.

If any of the configured filters match a given log entry (return True), then the log entry will be dropped. Queries are evaluated against log entries in the order that they are defined, and the ‘first match wins’.

class Configuration(*, name: str, processor: str, filters: List[str])[source]

Bases: ProcessorConfig

Expresses the configuration and associated validators for the processor.

filters: List[str]
process(entry: Dict[str, Any]) List[Dict[str, Any]][source]

Drop log entries which do not match all of the configured JMESPath queries.

Parameters:

entry – A collected log entry.

Returns:

The processed log entry, or an empty array if the log entry should be dropped.

grove.processors.filter_paths module

Grove processor to filter (delete) fields from log entries based on provided paths.

This processor is intended to allow removal of superfluous or duplicated data from log entries. This may be used after a processing stage to remove the original source data, or used to prune down a log entry from a particularly verbose vendor.

class grove.processors.filter_paths.Handler(config: Dict[str, Any])[source]

Bases: BaseProcessor

Filter (delete) fields from log entries based on provided paths.

class Configuration(*, name: str, processor: str, sources: List[str])[source]

Bases: ProcessorConfig

Expresses the configuration and associated validators for the processor.

sources: List[str]
process(entry: Dict[str, Any]) List[Dict[str, Any]][source]

Attempt to drop a configured field from the log entry.

Parameters:

entry – A collected log entry.

Returns:

The processed log entry, with fields dropped.

grove.processors.split_path module

Grove processor to split a log entry into N log entries by the specified JMESPath.

This processor is intended to allow “fanning-out” a single log entry which contains several related operations into distinct log entries per item. The remainder of the log entry outside of the split path will not be modified.

It is important to note that the list of elements which is fanned-out will be converted into a dictionary, rather than a list.

As an example, in following sample:

{
“events”: [

{“name”: “First”, “value”: 1}, {“name”: “Second”, “value”: 2},

]

}

After splitting based on “events”, two records would be generated, which contain the following:

{

“events”: {“name”: “First”, “value”: 1},

}

{

“events”: {“name”: “Second”, “value”: 2},

}

class grove.processors.split_path.Handler(config: Dict[str, Any])[source]

Bases: BaseProcessor

Split a log entry into N log entries by the specified JMESPath.

class Configuration(*, name: str, processor: str, source: str)[source]

Bases: ProcessorConfig

Expresses the configuration and associated validators for the processor.

source: str
process(entry: Dict[str, Any]) List[Dict[str, Any]][source]

Attempt to extract and map fields from the log entry.

Parameters:

entry – A collected log entry.

Returns:

The processed log entry.

grove.processors.zip_paths module

Grove processor to zip two sets of data into a dictionary of key / value pairs.

This processor is useful for transforming lists of key / value pairs, where the key and the value of a desired set of data. This processor is useful for flattening key / value data from sources such as Google Workspace activity logs.

In line with this Google Workspace example, data from Google may appear as follows:

“parameters”: [

{“name”: “owner”, “value”: “a-user@example.org”}, {“name”: “visibility”, “value”: “private”}

]

Unfortunately, this data can be hard to work with in many SIEMs and search indexes as it is expressed in the raw log entries. As a result, this processor may be used to instead ‘flatten’ this key / value data into a dictionary which is keyed by the extracted value of “name”, and uses the value from the “value” field:

“parameters”: {

“owner”: “a-user@example.org”, “visibility”: “private”

}

Making this data considerably easier to work with during creation of indexes, and creation of detection content.

class grove.processors.zip_paths.Handler(config: Dict[str, Any])[source]

Bases: BaseProcessor

Extract and map fields using JMESPaths.

class Configuration(*, name: str, processor: str, source: str, key: str, values: List[str] = [])[source]

Bases: ProcessorConfig

Expresses the configuration and associated validators for the processor.

key: str
source: str
values: List[str]
process(entry: Dict[str, Any]) List[Dict[str, Any]][source]

Extract and zip configured paths, replacing the source.

Parameters:

entry – A collected log entry.

Returns:

The processed log entry with fields zipped.