# Copyright (c) HashiCorp, Inc.# SPDX-License-Identifier: MPL-2.0"""Grove processor to zip two sets of data into a dictionary of key / value pairs.This processor is useful for transforming lists of key / value pairs, where the key andthe value of a desired set of data. This processor is useful for flattening key / valuedata from sources such as Google Workspace activity logs.In line with this Google Workspace example, data from Google may appear as follows: "parameters": [ {"name": "owner", "value": "a-user@example.org"}, {"name": "visibility", "value": "private"} ]Unfortunately, this data can be hard to work with in many SIEMs and search indexes as itis expressed in the raw log entries. As a result, this processor may be used to instead'flatten' this key / value data into a dictionary which is keyed by the extracted valueof "name", and uses the value from the "value" field: "parameters": { "owner": "a-user@example.org", "visibility": "private" }Making this data considerably easier to work with during creation of indexes, andcreation of detection content."""fromtypingimportAny,Dict,ListimportjmespathfrompydanticimportExtrafromgrove.helpersimportparsingfromgrove.modelsimportProcessorConfigfromgrove.processorsimportBaseProcessor
[docs]classHandler(BaseProcessor):"""Extract and map fields using JMESPaths."""
[docs]classConfiguration(ProcessorConfig,extra=Extra.forbid):"""Expresses the configuration and associated validators for the processor."""# Source defines the JMESPath to the data which needs to be zipped.source:str# Key defines the JMESPath of the data to use as keys in the constructed# dictionary. This must be the path relative to the source, not the absolute# path.key:str# Values defines the JMESPaths of the data to use as values in the constructed# dictionary. If multiple are provided, the sources are processed in order with# the first match winning. This must be the path relative to the source, not the# absolute path.values:List[str]=[]
[docs]defprocess(self,entry:Dict[str,Any])->List[Dict[str,Any]]:"""Extract and zip configured paths, replacing the source. :param entry: A collected log entry. :return: The processed log entry with fields zipped. """result:Dict[str,Any]={}children:List[Any]=[]# If the source field cannot be found, just pass the record back to the caller# as we don't want to drop it. We also want to make sure we can always iterate# over the children, so if the value isn't a list, map it into one.candidate=jmespath.search(self.configuration.source,entry)ifcandidateisNone:return[entry]ifisinstance(candidate,list):children=candidateelse:children=[candidate]forchildinchildren:# No key? Skip.key=jmespath.search(self.configuration.key,child)ifkeyisNone:continue# No values found? Skip.value=Noneforpathinself.configuration.values:value=jmespath.search(path,child)ifvalueisnotNone:breakifvalueisNone:continue# If we have both save it and move on.result[key]=value# Map the processed data over the top of the original.processed=parsing.update_path(entry,parsing.quote_aware_split(self.configuration.source),result,replace=True,)# Return the newly processed entry. A list is always used, even if only a single# element is returned, to allow support for dropping log entries, or splitting a# single log entry into multiple.return[processed]