Parsing

Introduction

I recently had to write a parser for Nokia's SROS network operating system, and I learned a great deal by doing this. As one often do, I started out with some pretty terrible attempts, and gradually found my way out of the mess that I had created.

I wanted to share my approach, hopefully something will be useful for others here.

Challenge

Network configuration syntax is often full of context, and meant for humans to write, rather than machines to easily read. This provides challenge when writing parsers, as they have to account for where a certain statement is meant to be applied.

Juniper's JUNOS operating system is the prime example of how to make a network OS that easy to read both by humans and by machines, but it also differs from other OS' a great deal.

I am by no means an expert in Nokia's SROS, but I had to become comfortable with it, as I took over the operation of a network based on this.

Flattening

SROS is based on an indenting syntax. This proved hard for me to write a parser for, but luckily there were several libraries out there that flattened the configuration, making it much easier to parse.

After running it through the code to flatten the configuration, I needed a way organize the regular expressions I used to parse it.

Generic parser

I came up with a generic class that provides short hand functions, and then subclassing it to provide specific parsers.

This looks like this:

class GenericParser(object): """This is the GenericParser object. Subclass this for each specific parser, and override the parsers array.

parsers is a list of tuples. Each tuple should be a string and a re.compile() object. The regular expression should have either one or no capture group. When one is present, the string in the tuple will define a variable that contains the result of the capture.

Otherwise it will be a true/false value.

"""

parsers = [] global_parsers = [ ('description', re.compile(r'description "(.+)"\s*$')), ('disabled', re.compile(r'\s[^n][^o] shutdown\s*$')), ]

def __init__(self, ignored_keywords=list()): """ This initiates the parser object, sets a generic set of patterns and methods.

Use GenericParser.parameters for parameter access Use GenericParser.debug_parameters for full trace of the parser runs """ for keyword in ignored_keywords: self.parsers.pop(keyword, None) self.parameters = {} self.debug_parameters = [] local_parsers = list(map(lambda x: x[0], self.parsers)) for parser in self.global_parsers: if parser[0] not in local_parsers: self.parsers.append(parser)

def parse(self, line): """ Parse the line, add debugging information for stack tracing """ debug = {}

debug['line'] = line debug['parser_runs'] = [] for parser in self.parsers: (parameter, expression) = parser run_debug = {'parameter': parameter, 'expression': expression} if expression.search(line) is not None: run_debug['result'] = True res = expression.search(line) if len(res.groups()) == 0: self.parameters[parameter] = True else: self.parameters[parameter] = res.group(1)

else: run_debug['result'] = False

debug['parser_runs'].append(run_debug)

This GenericParser object allows subclassing in to small handy classes, like this:

class Routed_interface(GenericParser):

parsers = [ ('ipv4_address', re.compile( r'interface "[^"]+" address (\d+\.\d+\.\d+\.\d+/\d+)\s*$')), ('port', re.compile(r'interface "[^"]+" port ([0-9/]+)\s*$')), ('admin_down', re.compile(r'interface "[^"]+" shutdown\s*$')), ('port', re.compile( r'interface "\S+" sap ([0-9/]+):[0-9]+ create\s*$')), ('sap_description', re.compile( r'interface "\S+" sap [0-9/:]+ description "(.+)"\s*$')), ('vlan', re.compile( r'interface "\S+" sap [0-9/]+:([0-9]+) create\s*$')), ('vrrp_instance', re.compile(r'vrrp ([0-9]+)')), ('vrrp_address', re.compile( r'vrrp [0-9]+ backup (\d+\.\d+\.\d+\.\d+)\s*$')), ('vrrp_priority', re.compile(r'vrrp [0-9]+ priority ([0-9]+)\s*$')), ('vrrp_echo', re.compile(r'vrrp [0-9]+ ping-reply\s*$')), ('vpls', re.compile(r'interface ".+" vpls "(.+)"\s*$')) ]

Here you see how I am actually just providing a list of tuples containing the thing I want to search for, and a regular expression to search.

If I have a capture group defined, the name will contain the result, if not, it's a boolean set to true if the regex matches.

Structure

The problem with this structure is that I still need a place to /store/ the results. I would want some sort of dictionary containing e.g. each routed interface matched by above parser.

For this, I created another class/subclass structure. I won't show the code here, as it is too long for a blog post, but it allows me to create constructs like this:

class Routed_interface(GenericParseStructure): """ Stores routed interfaces (non-VRF) """ structure = { 'context': r'^/configure router\s?(?:[a-zA-Z0-9-]+)? interface\s', 'container': 'dict', 'key': r'^/configure router\s?(?:[a-zA-Z0-9-]+)? interface "(\S+)"', 'parser': srosparsers.Routed_interface }

If we look how the PyTest case for Routed_Interface looks, you will see the ease of use:

def test_routed_interface(): input = [ '/configure router interface "int-2/1/1" address 10.0.0.1/30', '/configure router interface "int-2/1/1" description "DESCRIPTION"', '/configure router interface "int-2/1/1" port 2/1/1', '/configure router interface "int-2/1/1" no shutdown', ] expected = OrderedDict( [ ( "int-2/1/1", { "ipv4_address": "10.0.0.1/30", "description": "DESCRIPTION", "port": "2/1/1", }, ) ] )

interface = srosstruct.Routed_interface() for line in input: interface.process(line)

data = interface.data()

assert data == expected

Putting it all together

The last object I created was just to organize all the structures in one class:

class SROSParser(object): """ This is the shorthand parser that fits in on top of srosstruct """

def __init__(self): """ Initialize the parser object """ self.parsers = { 'node': srosstruct.Node(), 'ports': srosstruct.Port(), 'lags': srosstruct.LAG(), 'ospf': srosstruct.BaseOSPF(), 'ospf_interfaces': srosstruct.BaseOSPFInterface(), 'paths': srosstruct.Path(), 'hops': srosstruct.Hop(), 'rsvp_lsps': srosstruct.RSVP_LSP(), 'interfaces': srosstruct.Routed_interface(), 'static_routes': srosstruct.StaticRoute(), 'bgp_groups': srosstruct.BGPGroup(), 'bgp_peers': srosstruct.BGPPeer(), 'routing_options': srosstruct.RoutingOptions(), 'vrfs': srosstruct.VPRN(), 'vrf_interfaces': srosstruct.VPRNInterface(), 'vrf_static_routes': srosstruct.VPRNStaticRoute(), 'vrf_ospf': srosstruct.VPRNOSPF(), 'vrf_ospf_interfaces': srosstruct.VPRNOSPFInterface(), 'vrf_bgp_groups': srosstruct.VPRNBGPGroup(), 'vrf_bgp_peers': srosstruct.VPRNBGPPeer(), 'prefix_lists': srosstruct.PrefixList(), 'lsps': srosstruct.SDP(), 'service_names': srosstruct.ServiceName(), 'l2vpn': srosstruct.L2VPN(), 'pseudowires': srosstruct.Pseudowire(), 'l2vpn_interfaces': srosstruct.L2Interface(), 'cards': srosstruct.Card(), 'mdas': srosstruct.MDA() }

def parse(self, line): """ Runs a line through all the parsers """ for _, parser in self.parsers.items(): parser.process(line)

This now allows me to loop through each configuration line, and call SROSParser.parse() on the lines.

Afterwards I can use SROSParser.parsers[name].data() to get the structured parsed data.

Conclusion

The goal of this structure was to create a structure that was easy to maintain. To keep the actual processing code low, I created a number of abstraction classes, allowing maximum code reuse.

This also allowed me to write effective tests, to ensure my parser would work all the time, and avoid regressions.

Using the above code, I've managed to do this reasonably well.

comments powered by Disqus