Parsing
Introduction
I recently had to write a parser for Nokia's SROS network operating system, and I learned a great deal by doing this. As one often do, I started out with some pretty terrible attempts, and gradually found my way out of the mess that I had created.
I wanted to share my approach, hopefully something will be useful for others here.
Challenge
Network configuration syntax is often full of context, and meant for humans to write, rather than machines to easily read. This provides challenge when writing parsers, as they have to account for where a certain statement is meant to be applied.
Juniper's JUNOS operating system is the prime example of how to make a network OS that easy to read both by humans and by machines, but it also differs from other OS' a great deal.
I am by no means an expert in Nokia's SROS, but I had to become comfortable with it, as I took over the operation of a network based on this.
Flattening
SROS is based on an indenting syntax. This proved hard for me to write a parser for, but luckily there were several libraries out there that flattened the configuration, making it much easier to parse.
After running it through the code to flatten the configuration, I needed a way organize the regular expressions I used to parse it.
Generic parser
I came up with a generic class that provides short hand functions, and then subclassing it to provide specific parsers.
This looks like this:
class GenericParser(object):
"""This is the GenericParser object.
Subclass this for each specific parser, and override the parsers array.
parsers is a list of tuples. Each tuple should be a string and a
re.compile() object. The regular expression should have either one
or no capture group. When one is present, the string in the tuple
will define a variable that contains the result of the capture.
Otherwise it will be a true/false value.
"""
parsers = []
global_parsers = [
('description', re.compile(r'description "(.+)"\s*$')),
('disabled', re.compile(r'\s[^n][^o] shutdown\s*$')),
]
def __init__(self, ignored_keywords=list()):
"""
This initiates the parser object, sets a generic set of patterns and methods.
Use GenericParser.parameters for parameter access
Use GenericParser.debug_parameters for full trace of the parser runs
"""
for keyword in ignored_keywords:
self.parsers.pop(keyword, None)
self.parameters = {}
self.debug_parameters = []
local_parsers = list(map(lambda x: x[0], self.parsers))
for parser in self.global_parsers:
if parser[0] not in local_parsers:
self.parsers.append(parser)
def parse(self, line):
""" Parse the line, add debugging information for stack tracing
"""
debug = {}
debug['line'] = line
debug['parser_runs'] = []
for parser in self.parsers:
(parameter, expression) = parser
run_debug = {'parameter': parameter, 'expression': expression}
if expression.search(line) is not None:
run_debug['result'] = True
res = expression.search(line)
if len(res.groups()) == 0:
self.parameters[parameter] = True
else:
self.parameters[parameter] = res.group(1)
else:
run_debug['result'] = False
debug['parser_runs'].append(run_debug)
This GenericParser
object allows subclassing in to small handy classes, like this:
class Routed_interface(GenericParser):
parsers = [
('ipv4_address', re.compile(
r'interface "[^"]+" address (\d+\.\d+\.\d+\.\d+/\d+)\s*$')),
('port', re.compile(r'interface "[^"]+" port ([0-9/]+)\s*$')),
('admin_down', re.compile(r'interface "[^"]+" shutdown\s*$')),
('port', re.compile(
r'interface "\S+" sap ([0-9/]+):[0-9]+ create\s*$')),
('sap_description', re.compile(
r'interface "\S+" sap [0-9/:]+ description "(.+)"\s*$')),
('vlan', re.compile(
r'interface "\S+" sap [0-9/]+:([0-9]+) create\s*$')),
('vrrp_instance', re.compile(r'vrrp ([0-9]+)')),
('vrrp_address', re.compile(
r'vrrp [0-9]+ backup (\d+\.\d+\.\d+\.\d+)\s*$')),
('vrrp_priority', re.compile(r'vrrp [0-9]+ priority ([0-9]+)\s*$')),
('vrrp_echo', re.compile(r'vrrp [0-9]+ ping-reply\s*$')),
('vpls', re.compile(r'interface ".+" vpls "(.+)"\s*$'))
]
Here you see how I am actually just providing a list of tuples containing the thing I want to search for, and a regular expression to search.
If I have a capture group defined, the name will contain the result, if not, it's a boolean set to true if the regex matches.
Structure
The problem with this structure is that I still need a place to store the results. I would want some sort of dictionary containing e.g. each routed interface matched by above parser.
For this, I created another class/subclass structure. I won't show the code here, as it is too long for a blog post, but it allows me to create constructs like this:
class Routed_interface(GenericParseStructure):
""" Stores routed interfaces (non-VRF) """
structure = {
'context': r'^/configure router\s?(?:[a-zA-Z0-9-]+)? interface\s',
'container': 'dict',
'key': r'^/configure router\s?(?:[a-zA-Z0-9-]+)? interface "(\S+)"',
'parser': srosparsers.Routed_interface
}
If we look how the PyTest case for Routed_Interface looks, you will see the ease of use:
def test_routed_interface():
input = [
'/configure router interface "int-2/1/1" address 10.0.0.1/30',
'/configure router interface "int-2/1/1" description "DESCRIPTION"',
'/configure router interface "int-2/1/1" port 2/1/1',
'/configure router interface "int-2/1/1" no shutdown',
]
expected = OrderedDict(
[
(
"int-2/1/1",
{
"ipv4_address": "10.0.0.1/30",
"description": "DESCRIPTION",
"port": "2/1/1",
},
)
]
)
interface = srosstruct.Routed_interface()
for line in input:
interface.process(line)
data = interface.data()
assert data == expected
Putting it all together
The last object I created was just to organize all the structures in one class:
class SROSParser(object):
""" This is the shorthand parser that fits in on top of srosstruct """
def __init__(self):
""" Initialize the parser object """
self.parsers = {
'node': srosstruct.Node(),
'ports': srosstruct.Port(),
'lags': srosstruct.LAG(),
'ospf': srosstruct.BaseOSPF(),
'ospf_interfaces': srosstruct.BaseOSPFInterface(),
'paths': srosstruct.Path(),
'hops': srosstruct.Hop(),
'rsvp_lsps': srosstruct.RSVP_LSP(),
'interfaces': srosstruct.Routed_interface(),
'static_routes': srosstruct.StaticRoute(),
'bgp_groups': srosstruct.BGPGroup(),
'bgp_peers': srosstruct.BGPPeer(),
'routing_options': srosstruct.RoutingOptions(),
'vrfs': srosstruct.VPRN(),
'vrf_interfaces': srosstruct.VPRNInterface(),
'vrf_static_routes': srosstruct.VPRNStaticRoute(),
'vrf_ospf': srosstruct.VPRNOSPF(),
'vrf_ospf_interfaces': srosstruct.VPRNOSPFInterface(),
'vrf_bgp_groups': srosstruct.VPRNBGPGroup(),
'vrf_bgp_peers': srosstruct.VPRNBGPPeer(),
'prefix_lists': srosstruct.PrefixList(),
'lsps': srosstruct.SDP(),
'service_names': srosstruct.ServiceName(),
'l2vpn': srosstruct.L2VPN(),
'pseudowires': srosstruct.Pseudowire(),
'l2vpn_interfaces': srosstruct.L2Interface(),
'cards': srosstruct.Card(),
'mdas': srosstruct.MDA()
}
def parse(self, line):
""" Runs a line through all the parsers """
for _, parser in self.parsers.items():
parser.process(line)
This now allows me to loop through each configuration line, and call SROSParser.parse()
on the lines.
Afterwards I can use SROSParser.parsers[name].data()
to get the structured parsed data.
Conclusion
The goal of this structure was to create a structure that was easy to maintain. To keep the actual processing code low, I created a number of abstraction classes, allowing maximum code reuse.
This also allowed me to write effective tests, to ensure my parser would work all the time, and avoid regressions.
Using the above code, I've managed to do this reasonably well.