#!/usr/bin/python
#
# Copyright (c) 2012 IETF Trust and the persons identified as
# authors of the code. All rights reserved. Redistribution and use
# in source and binary forms, with or without modification, is
# permitted pursuant to, and subject to the license terms contained
# in, the Simplified BSD License set forth in Section 4.c of the
# IETF Trust's Legal Provisions Relating to IETF
# Documents (http://trustee.ietf.org/license-info).
"""Simple format validator for self-published ipgeo feeds.
This tool reads CSV data in the self-published ipgeo feed format
from the standard input and performs basic validation. It is
intended for use by feed publishers before launching a feed.
"""
import csv
import ipaddr
import re
import sys
class IPGeoFeedValidator(object):
def __init__(self):
self.prefixes = {}
self.line_number = 0
self.output_log = {}
self.SetOutputStream(sys.stderr)
def Validate(self, feed):
"""Check validity of an IPGeo feed.
Args:
feed: iterable with feed lines
"""
for line in feed:
self._ValidateLine(line)
def SetOutputStream(self, logfile):
"""Controls where the output messages go do (STDERR by default).
Use None to disable logging.
Args:
logfile: a file object (e.g., sys.stdout) or None.
"""
self.output_stream = logfile
def CountErrors(self, severity):
"""How many ERRORs or WARNINGs were generated."""
return len(self.output_log.get(severity, []))
############################################################
def _ValidateLine(self, line):
line = line.rstrip('\r\n')
self.line_number += 1
self.line = line.split('#')[0]
self.is_correct_line = True
if self._ShouldIgnoreLine(line):
return
fields = [field for field in csv.reader([line])][0]
self._ValidateFields(fields)
self._FlushOutputStream()
def _ShouldIgnoreLine(self, line):
line = line.strip()
if line.startswith('#'):
return True
return len(line) == 0
############################################################
def _ValidateFields(self, fields):
assert(len(fields) > 0)
is_correct = self._IsIPAddressOrPrefixCorrect(fields[0])
if len(fields) > 1:
if not self._IsAlpha2CodeCorrect(fields[1]):
is_correct = False
if len(fields) > 2 and not self._IsRegionCodeCorrect(fields[2]):
is_correct = False
if len(fields) != 5:
self._ReportWarning('5 fields were expected (got %d).'
% len(fields))
############################################################
def _IsIPAddressOrPrefixCorrect(self, field):
if '/' in field:
return self._IsCIDRCorrect(field)
return self._IsIPAddressCorrect(field)
def _IsCIDRCorrect(self, cidr):
try:
ipprefix = ipaddr.IPNetwork(cidr)
if ipprefix.network._ip != ipprefix._ip:
self._ReportError('Incorrect IP Network.')
return False
if ipprefix.is_private:
self._ReportError('IP Address must not be private.')
return False
except:
self._ReportError('Incorrect IP Network.')
return False
return True
def _IsIPAddressCorrect(self, ipaddress):
try:
ip = ipaddr.IPAddress(ipaddress)
except:
self._ReportError('Incorrect IP Address.')
return False
if ip.is_private:
self._ReportError('IP Address must not be private.')
return False
return True
############################################################
def _IsAlpha2CodeCorrect(self, alpha2code):
if len(alpha2code) == 0:
return True
if len(alpha2code) != 2 or not alpha2code.isalpha():
self._ReportError(
'Alpha 2 code must be in the ISO 3166-1 alpha 2 format.')
return False
return True
def _IsRegionCodeCorrect(self, region_code):
if len(region_code) == 0:
return True
if '-' not in region_code:
self._ReportError('Region code must be in ISO 3166-2 format.')
return False
parts = region_code.split('-')
if not self._IsAlpha2CodeCorrect(parts[0]):
return False
return True
############################################################
def _ReportError(self, message):
self._ReportWithSeverity('ERROR', message)
def _ReportWarning(self, message):
self._ReportWithSeverity('WARNING', message)
def _ReportWithSeverity(self, severity, message):
self.is_correct_line = False
output_line = '%s: %s\n' % (severity, message)
if severity not in self.output_log:
self.output_log[severity] = []
self.output_log[severity].append(output_line)
if self.output_stream is not None:
self.output_stream.write(output_line)
def _FlushOutputStream(self):
if self.is_correct_line: return
if self.output_stream is None: return
self.output_stream.write('line %d: %s\n\n'
% (self.line_number, self.line))
############################################################
def main():
feed_validator = IPGeoFeedValidator()
feed_validator.Validate(sys.stdin)
if feed_validator.CountErrors('ERROR'):
sys.exit(1)
if __name__ == '__main__':
main()
A unit test file, "ipgeo_feed_validator_test.py" is provided as well.
It provides basic test coverage of the code above, though does not
test correct handling of non-ASCII UTF-8 strings.
#!/usr/bin/python
#
# Copyright (c) 2012 IETF Trust and the persons identified as
# authors of the code. All rights reserved. Redistribution and use
# in source and binary forms, with or without modification, is
# permitted pursuant to, and subject to the license terms contained
# in, the Simplified BSD License set forth in Section 4.c of the
# IETF Trust's Legal Provisions Relating to IETF
# Documents (http://trustee.ietf.org/license-info).
import sys
from ipgeo_feed_validator import IPGeoFeedValidator
class IPGeoFeedValidatorTest(object):
def __init__(self):
self.validator = IPGeoFeedValidator()
self.validator.SetOutputStream(None)
self.successes = 0
self.failures = 0
def Run(self):
self.TestFeedLine('# asdf', 0, 0)
self.TestFeedLine(' ', 0, 0)
self.TestFeedLine('', 0, 0)
self.TestFeedLine('asdf', 1, 1)
self.TestFeedLine('asdf,US,,,', 1, 0)
self.TestFeedLine('aaaa::,US,,,', 0, 0)
self.TestFeedLine('zzzz::,US', 1, 1)
self.TestFeedLine(',US,,,', 1, 0)
self.TestFeedLine('55.66.77', 1, 1)
self.TestFeedLine('55.66.77.888', 1, 1)
self.TestFeedLine('55.66.77.asdf', 1, 1)
self.TestFeedLine('2001:db8:cafe::/48,PL,PL-MZ,,02-784', 0, 0)
self.TestFeedLine('2001:db8:cafe::/48', 0, 1)
self.TestFeedLine('55.66.77.88,PL', 0, 1)
self.TestFeedLine('55.66.77.88,PL,,,', 0, 0)
self.TestFeedLine('55.66.77.88,,,,', 0, 0)
self.TestFeedLine('55.66.77.88,ZZ,,,', 0, 0)
self.TestFeedLine('55.66.77.88,US,,,', 0, 0)
self.TestFeedLine('55.66.77.88,USA,,,', 1, 0)
self.TestFeedLine('55.66.77.88,99,,,', 1, 0)
self.TestFeedLine('55.66.77.88,US,US-CA,,', 0, 0)
self.TestFeedLine('55.66.77.88,US,USA-CA,,', 1, 0)
self.TestFeedLine('55.66.77.88,USA,USA-CA,,', 2, 0)
self.TestFeedLine('55.66.77.88,US,US-CA,Mountain View,', 0, 0)
self.TestFeedLine('55.66.77.88,US,US-CA,Mountain View,94043',
0, 0)
self.TestFeedLine('55.66.77.88,US,US-CA,Mountain View,94043,'
'1600 Ampthitheatre Parkway', 0, 1)
self.TestFeedLine('55.66.77.0/24,US,,,', 0, 0)
self.TestFeedLine('55.66.77.88/24,US,,,', 1, 0)
self.TestFeedLine('55.66.77.88/32,US,,,', 0, 0)
self.TestFeedLine('55.66.77/24,US,,,', 1, 0)
self.TestFeedLine('55.66.77.0/35,US,,,', 1, 0)
self.TestFeedLine('172.15.30.1,US,,,', 0, 0)
self.TestFeedLine('172.28.30.1,US,,,', 1, 0)
self.TestFeedLine('192.167.100.1,US,,,', 0, 0)
self.TestFeedLine('192.168.100.1,US,,,', 1, 0)
self.TestFeedLine('10.0.5.9,US,,,', 1, 0)
self.TestFeedLine('10.0.5.0/24,US,,,', 1, 0)
self.TestFeedLine('fc00::/48,PL,,,', 1, 0)
self.TestFeedLine('fe00::/48,PL,,,', 0, 0)
print ('%d tests passed, %d failed'
% (self.successes, self.failures))
def IsOutputLogCorrectAtSeverity(self, severity,
expected_msg_count):
msg_count = self.validator.CountErrors(severity)
if msg_count != expected_msg_count:
print ('TEST FAILED: %s\nexpected %d %s[s], observed %d\n%s\n'
% (self.validator.line, expected_msg_count, severity,
msg_count,
str(self.validator.output_log[severity])))
return False
return True
def IsOutputLogCorrect(self, new_errors, new_warnings):
retval = True
if not self.IsOutputLogCorrectAtSeverity('ERROR', new_errors):
retval = False
if not self.IsOutputLogCorrectAtSeverity('WARNING',
new_warnings):
retval = False
return retval
def TestFeedLine(self, line, warning_count, error_count):
self.validator.output_log['WARNING'] = []
self.validator.output_log['ERROR'] = []
self.validator._ValidateLine(line)
if not self.IsOutputLogCorrect(warning_count, error_count):
self.failures += 1
return False
self.successes += 1
return True
if __name__ == '__main__':
IPGeoFeedValidatorTest().Run()
Acknowledgements
The authors would like to express their gratitude to reviewers and
early implementors, including but not limited to Mikael Abrahamsson,
Andrew Alston, Ray Bellis, John Bond, Alissa Cooper, Andras Erdei,
Stephen Farrell, Marco Hogewoning, Mike Joseph, Maciej Kuzniar,
George Michaelson, Menno Schepers, Justyna Sidorska, Pim van Pelt,
and Bjoern A. Zeeb.
In particular, Richard L. Barnes and Andy Newton contributed
substantial review, text, and advice.
Authors' Addresses
Erik Kline
Loon LLC
1600 Amphitheatre Parkway
Mountain View, CA 94043
United States of America
Email: ek@loon.com
Krzysztof Duleba
Google
1600 Amphitheatre Parkway
Mountain View, CA 94043
United States of America
Email: kduleba@google.com
Zoltan Szamonek
Google Switzerland GmbH
Brandschenkestrasse 110
CH-8002 Zürich
Switzerland
Email: zszami@google.com
Stefan Moser
Google Switzerland GmbH
Brandschenkestrasse 110
CH-8002 Zürich
Switzerland
Email: smoser@google.com
Warren Kumari
Google
1600 Amphitheatre Parkway
Mountain View, CA 94043
United States of America
Email: warren@kumari.net
gemini://gemini.bortzmeyer.org/rfc-mirror/rfc8805.txt -- Leo's gemini proxy
-- Connecting to gemini.bortzmeyer.org:1965...
-- Connected
-- Sending request
-- Meta line: 20 text/plain
-- Response ended
-- Page fetched on Mon May 6 14:56:40 2024