@ -12,9 +12,9 @@ import random
import unittest
from enum import Enum
from functools import total_ordering
from typing import Callable , Dict, Iterable, List, Optional, Tuple , Union , overload
from typing import Callable , Iterable, Optional, Union , overload
def net_to_prefix ( net : Union [ ipaddress . IPv4Network , ipaddress . IPv6Network ] ) - > L ist[ bool ] :
def net_to_prefix ( net : Union [ ipaddress . IPv4Network , ipaddress . IPv6Network ] ) - > l ist[ bool ] :
"""
Convert an IPv4 or IPv6 network to a prefix represented as a list of bits .
@ -32,7 +32,7 @@ def net_to_prefix(net: Union[ipaddress.IPv4Network,ipaddress.IPv6Network]) -> Li
assert ( netrange & ( ( 1 << ( 128 - num_bits ) ) - 1 ) ) == 0
return [ ( ( netrange >> ( 127 - i ) ) & 1 ) != 0 for i in range ( num_bits ) ]
def prefix_to_net ( prefix : L ist[ bool ] ) - > Union [ ipaddress . IPv4Network , ipaddress . IPv6Network ] :
def prefix_to_net ( prefix : l ist[ bool ] ) - > Union [ ipaddress . IPv4Network , ipaddress . IPv6Network ] :
""" The reverse operation of net_to_prefix. """
# Convert to number
netrange = sum ( b << ( 127 - i ) for i , b in enumerate ( prefix ) )
@ -47,10 +47,10 @@ def prefix_to_net(prefix: List[bool]) -> Union[ipaddress.IPv4Network,ipaddress.I
return ipaddress . IPv6Network ( ( netrange , num_bits ) , True )
# Shortcut for (prefix, ASN) entries.
ASNEntry = Tuple [ L ist[ bool ] , int ]
ASNEntry = tuple [ l ist[ bool ] , int ]
# Shortcut for (prefix, old ASN, new ASN) entries.
ASNDiff = Tuple [ L ist[ bool ] , int , int ]
ASNDiff = tuple [ l ist[ bool ] , int , int ]
class _VarLenCoder :
"""
@ -75,7 +75,7 @@ class _VarLenCoder:
other classes start one past the last element of the class before it .
"""
def __init__ ( self , minval : int , clsbits : L ist[ int ] ) :
def __init__ ( self , minval : int , clsbits : l ist[ int ] ) :
""" Construct a new _VarLenCoder. """
self . _minval = minval
self . _clsbits = clsbits
@ -85,7 +85,7 @@ class _VarLenCoder:
""" Check whether value val is in the range this coder supports. """
return self . _minval < = val < = self . _maxval
def encode ( self , val : int , ret : L ist[ int ] ) - > None :
def encode ( self , val : int , ret : l ist[ int ] ) - > None :
""" Append encoding of val onto integer list ret. """
assert self . _minval < = val < = self . _maxval
@ -120,7 +120,7 @@ class _VarLenCoder:
break
return ret + bits
def decode ( self , stream , bitpos ) - > T uple[ int , int ] :
def decode ( self , stream , bitpos ) - > t uple[ int , int ] :
""" Decode a number starting at bitpos in stream, returning value and new bitpos. """
val = self . _minval
bits = 0
@ -281,11 +281,11 @@ class ASMap:
- mappings , represented by new trie nodes .
"""
def update ( self , prefix : L ist[ bool ] , asn : int ) - > None :
def update ( self , prefix : l ist[ bool ] , asn : int ) - > None :
""" Update this ASMap object to map prefix to the specified asn. """
assert asn == 0 or _CODER_ASN . can_encode ( asn )
def recurse ( node : L ist, offset : int ) - > None :
def recurse ( node : l ist, offset : int ) - > None :
if offset == len ( prefix ) :
# Reached the end of prefix; overwrite this node.
node . clear ( )
@ -306,7 +306,7 @@ class ASMap:
node . append ( oldasn )
recurse ( self . _trie , 0 )
def update_multi ( self , entries : List [ Tuple [ L ist[ bool ] , int ] ] ) - > None :
def update_multi ( self , entries : list [ tuple [ l ist[ bool ] , int ] ] ) - > None :
""" Apply multiple update operations, where longer prefixes take precedence. """
entries . sort ( key = lambda entry : len ( entry [ 0 ] ) )
for prefix , asn in entries :
@ -314,7 +314,7 @@ class ASMap:
def _set_trie ( self , trie ) - > None :
""" Set trie directly. Internal use only. """
def recurse ( node : L ist) - > None :
def recurse ( node : l ist) - > None :
if len ( node ) < 2 :
return
recurse ( node [ 0 ] )
@ -342,7 +342,7 @@ class ASMap:
for prefix , asn in sorted ( entries , key = entry_key ) :
self . update ( prefix , asn )
def lookup ( self , prefix : L ist[ bool ] ) - > Optional [ int ] :
def lookup ( self , prefix : l ist[ bool ] ) - > Optional [ int ] :
""" Look up a prefix. Returns ASN, or 0 if unassigned, or None if indeterminate. """
node = self . _trie
for bit in prefix :
@ -353,11 +353,11 @@ class ASMap:
return node [ 0 ]
return None
def _to_entries_flat ( self , fill : bool = False ) - > L ist[ ASNEntry ] :
def _to_entries_flat ( self , fill : bool = False ) - > l ist[ ASNEntry ] :
""" Convert an ASMap object to a list of non-overlapping (prefix, asn) objects. """
prefix : L ist[ bool ] = [ ]
prefix : l ist[ bool ] = [ ]
def recurse ( node : List ) - > L ist[ ASNEntry ] :
def recurse ( node : list ) - > l ist[ ASNEntry ] :
ret = [ ]
if len ( node ) == 1 :
if node [ 0 ] > 0 :
@ -375,24 +375,24 @@ class ASMap:
return ret
return recurse ( self . _trie )
def _to_entries_minimal ( self , fill : bool = False ) - > L ist[ ASNEntry ] :
def _to_entries_minimal ( self , fill : bool = False ) - > l ist[ ASNEntry ] :
""" Convert a trie to a minimal list of ASNEntry objects, exploiting overlap. """
prefix : L ist[ bool ] = [ ]
prefix : l ist[ bool ] = [ ]
def recurse ( node : List ) - > ( Tuple [ Dict [ Optional [ int ] , L ist[ ASNEntry ] ] , bool ] ) :
def recurse ( node : list ) - > ( tuple [ dict [ Optional [ int ] , l ist[ ASNEntry ] ] , bool ] ) :
if len ( node ) == 1 and node [ 0 ] == 0 :
return { None if fill else 0 : [ ] } , True
if len ( node ) == 1 :
return { node [ 0 ] : [ ] , None : [ ( list ( prefix ) , node [ 0 ] ) ] } , False
ret : Dict [ Optional [ int ] , L ist[ ASNEntry ] ] = { }
ret : dict [ Optional [ int ] , l ist[ ASNEntry ] ] = { }
prefix . append ( False )
left , lhole = recurse ( node [ 0 ] )
prefix [ - 1 ] = True
right , rhole = recurse ( node [ 1 ] )
prefix . pop ( )
hole = not fill and ( lhole or rhole )
def candidate ( ctx : Optional [ int ] , res0 : Optional [ L ist[ ASNEntry ] ] ,
res1 : Optional [ L ist[ ASNEntry ] ] ) :
def candidate ( ctx : Optional [ int ] , res0 : Optional [ l ist[ ASNEntry ] ] ,
res1 : Optional [ l ist[ ASNEntry ] ] ) :
if res0 is not None and res1 is not None :
if ctx not in ret or len ( res0 ) + len ( res1 ) < len ( ret [ ctx ] ) :
ret [ ctx ] = res0 + res1
@ -417,7 +417,7 @@ class ASMap:
""" Convert this ASMap object to a string containing Python code constructing it. """
return f " ASMap( { self . _trie } ) "
def to_entries ( self , overlapping : bool = True , fill : bool = False ) - > L ist[ ASNEntry ] :
def to_entries ( self , overlapping : bool = True , fill : bool = False ) - > l ist[ ASNEntry ] :
"""
Convert the mappings in this ASMap object to a list of ASNEntry objects .
@ -448,7 +448,7 @@ class ASMap:
assert max_asn > = 1 or unassigned_prob == 1
assert _CODER_ASN . can_encode ( max_asn )
assert 0.0 < = unassigned_prob < = 1.0
trie : L ist = [ ]
trie : l ist = [ ]
leaves = [ trie ]
ret = ASMap ( )
for i in range ( 1 , num_leaves ) :
@ -472,12 +472,12 @@ class ASMap:
def _to_binnode ( self , fill : bool = False ) - > _BinNode :
""" Convert a trie to a _BinNode object. """
def recurse ( node : List ) - > Tuple [ D ict[ Optional [ int ] , _BinNode ] , bool ] :
def recurse ( node : list ) - > tuple [ d ict[ Optional [ int ] , _BinNode ] , bool ] :
if len ( node ) == 1 and node [ 0 ] == 0 :
return { ( None if fill else 0 ) : _BinNode . make_end ( ) } , True
if len ( node ) == 1 :
return { None : _BinNode . make_leaf ( node [ 0 ] ) , node [ 0 ] : _BinNode . make_end ( ) } , False
ret : D ict[ Optional [ int ] , _BinNode ] = { }
ret : d ict[ Optional [ int ] , _BinNode ] = { }
left , lhole = recurse ( node [ 0 ] )
right , rhole = recurse ( node [ 1 ] )
hole = ( lhole or rhole ) and not fill
@ -507,7 +507,7 @@ class ASMap:
@staticmethod
def _from_binnode ( binnode : _BinNode ) - > " ASMap " :
""" Construct an ASMap object from a _BinNode. Internal use only. """
def recurse ( node : _BinNode , default : int ) - > L ist:
def recurse ( node : _BinNode , default : int ) - > l ist:
if node . ins == _Instruction . RETURN :
return [ node . arg1 ]
if node . ins == _Instruction . JUMP :
@ -542,7 +542,7 @@ class ASMap:
Returns :
A bytes object with the encoding of this ASMap object .
"""
bits : L ist[ int ] = [ ]
bits : l ist[ int ] = [ ]
def recurse ( node : _BinNode ) - > None :
_CODER_INS . encode ( node . ins . value , bits )
@ -582,11 +582,11 @@ class ASMap:
def from_binary ( bindata : bytes ) - > Optional [ " ASMap " ] :
""" Decode an ASMap object from the provided binary encoding. """
bits : L ist[ int ] = [ ]
bits : l ist[ int ] = [ ]
for byte in bindata :
bits . extend ( ( byte >> i ) & 1 for i in range ( 8 ) )
def recurse ( bitpos : int ) - > T uple[ _BinNode , int ] :
def recurse ( bitpos : int ) - > t uple[ _BinNode , int ] :
insval , bitpos = _CODER_INS . decode ( bits , bitpos )
ins = _Instruction ( insval )
if ins == _Instruction . RETURN :
@ -632,7 +632,7 @@ class ASMap:
def extends ( self , req : " ASMap " ) - > bool :
""" Determine whether this matches req for all subranges where req is assigned. """
def recurse ( actual : List , require : L ist) - > bool :
def recurse ( actual : list , require : l ist) - > bool :
if len ( require ) == 1 and require [ 0 ] == 0 :
return True
if len ( require ) == 1 :
@ -646,20 +646,20 @@ class ASMap:
#pylint: disable=protected-access
return recurse ( self . _trie , req . _trie )
def diff ( self , other : " ASMap " ) - > L ist[ ASNDiff ] :
def diff ( self , other : " ASMap " ) - > l ist[ ASNDiff ] :
""" Compute the diff from self to other. """
prefix : L ist[ bool ] = [ ]
ret : L ist[ ASNDiff ] = [ ]
prefix : l ist[ bool ] = [ ]
ret : l ist[ ASNDiff ] = [ ]
def recurse ( old_node : List , new_node : L ist) :
def recurse ( old_node : list , new_node : l ist) :
if len ( old_node ) == 1 and len ( new_node ) == 1 :
if old_node [ 0 ] != new_node [ 0 ] :
ret . append ( ( list ( prefix ) , old_node [ 0 ] , new_node [ 0 ] ) )
else :
old_left : L ist = old_node if len ( old_node ) == 1 else old_node [ 0 ]
old_right : L ist = old_node if len ( old_node ) == 1 else old_node [ 1 ]
new_left : L ist = new_node if len ( new_node ) == 1 else new_node [ 0 ]
new_right : L ist = new_node if len ( new_node ) == 1 else new_node [ 1 ]
old_left : l ist = old_node if len ( old_node ) == 1 else old_node [ 0 ]
old_right : l ist = old_node if len ( old_node ) == 1 else old_node [ 1 ]
new_left : l ist = new_node if len ( new_node ) == 1 else new_node [ 0 ]
new_right : l ist = new_node if len ( new_node ) == 1 else new_node [ 1 ]
prefix . append ( False )
recurse ( old_left , new_left )
prefix [ - 1 ] = True
@ -760,7 +760,7 @@ class TestASMap(unittest.TestCase):
# It starts off being equal to asmap.
patched = copy . copy ( asmap )
# Keep a list of patches performed.
patches : L ist[ ASNEntry ] = [ ]
patches : l ist[ ASNEntry ] = [ ]
# Initially there cannot be any difference.
self . assertEqual ( asmap . diff ( patched ) , [ ] )
# Make 5 patches, each building on top of the previous ones.