# Copyright (c) HashiCorp, Inc.# SPDX-License-Identifier: MPL-2.0"""Grove processor to extract and map fields using JMESPaths.This processor is intended to be used to transform raw log entries into a common schema.This is especially useful for ensuring that all collected log entries from differingupstream vendors are in a consistent format - whether industry standard, or bespoke."""importjsonfromtypingimportAny,Dict,List,OptionalimportjmespathfrompydanticimportBaseModel,Extra,validatorfromgrove.helpersimportparsingfromgrove.modelsimportProcessorConfigfromgrove.processorsimportBaseProcessor
[docs]classMapping(BaseModel,extra=Extra.forbid):"""Expresses the configuration fields used to specify path mapping."""# Destination specifies where to write extracted or specified values into. This# can be a nested path, with subsequent dimensions specified with dots (`.`).destination:str# Sources defines a list of JMESPaths to map into the destination. If multiple# are provided, the sources are processed in order with the first match winning.sources:List[str]=[]# Static allows a static field to be written into the destination, rather than# extraction from the source. This field is incompatible with sources.static:Optional[str]
[docs]@validator("static")defstatic_or_sources(cls,value,values):"""Ensures that either sources or static is set, not both."""ifvalueandlen(values.get("sources"))>0:raiseValueError("Either sources or static should be set, not both.")returnvalue
[docs]classHandler(BaseProcessor):"""Extract and map fields using JMESPaths."""
[docs]classConfiguration(ProcessorConfig,extra=Extra.forbid):"""Expresses the configuration and associated validators for the processor."""# Remap the original event as a string under the provided path. If not set, any# field not mapped will be dropped.raw:Optional[str]# Defines the field mapping.fields:List[Mapping]
[docs]defprocess(self,entry:Dict[str,Any])->List[Dict[str,Any]]:"""Attempt to extract and map fields from the log entry. :param entry: A collected log entry. :return: The processed log entry with fields mapped, as a list. """result:Dict[str,Any]={}# Map the entire log entry under the given path - if configured.ifself.configuration.raw:result=parsing.update_path(result,parsing.quote_aware_split(self.configuration.raw),json.dumps(entry,separators=(",",":")),)forfieldinself.configuration.fields:value=field.staticdestination=parsing.quote_aware_split(field.destination)# If a static value is defined it should be used over any source fields.ifnotvalue:# Mappings may contain multiple sources to attempt to map. These are# evaluated from the first entry to the last, with the first match# winning.forsourceinfield.sources:value=jmespath.search(source,entry)ifvalue:break# Combine the extracted value with the data nested under the same path - or# create the path if not present.result=parsing.update_path(result,destination,value)# Return the newly processed entry. A list is always used, even if only a single# element is returned, to allow support for dropping log entries, or splitting a# single log entry into multiple.return[result]