Source code for sbws.lib.relaylist

import copy
from datetime import datetime, timedelta

from stem.descriptor.router_status_entry import RouterStatusEntryV3
from stem.descriptor.server_descriptor import ServerDescriptor
from stem import Flag, DescriptorUnavailable, ControllerError
import random
import logging
from threading import Lock

from ..globals import MEASUREMENTS_PERIOD

log = logging.getLogger(__name__)


[docs]def remove_old_consensus_timestamps( consensus_timestamps, measurements_period=MEASUREMENTS_PERIOD): """ Remove the consensus timestamps that are older than period for which the measurements are keep from a list of consensus_timestamps. :param list consensus_timestamps: :param int measurements_period: :returns list: a new list of ``consensus_timestamps`` """ oldest_date = datetime.utcnow() - timedelta(measurements_period) new_consensus_timestamps = \ [t for t in consensus_timestamps if t >= oldest_date] return new_consensus_timestamps
[docs]def valid_after_from_network_statuses(network_statuses): """Obtain the consensus Valid-After datetime from the ``document`` attribute of a ``stem.descriptor.RouterStatusEntryV3``. :param list network_statuses: returns datetime: """ for ns in network_statuses: document = getattr(ns, 'document', None) if document: valid_after = getattr(document, 'valid_after', None) if valid_after: return valid_after return datetime.utcnow().replace(microsecond=0)
[docs]class Relay: def __init__(self, fp, cont, ns=None, desc=None, timestamp=None): ''' Given a relay fingerprint, fetch all the information about a relay that sbws currently needs and store it in this class. Acts as an abstraction to hide the confusion that is Tor consensus/descriptor stuff. :param str fp: fingerprint of the relay. :param cont: active and valid stem Tor controller connection :param datatime timestamp: the timestamp of a consensus (RouterStatusEntryV3) from which this relay has been obtained. ''' assert isinstance(fp, str) assert len(fp) == 40 if ns is not None: assert isinstance(ns, RouterStatusEntryV3) self._ns = ns else: try: self._ns = cont.get_network_status(fp, default=None) except (DescriptorUnavailable, ControllerError) as e: log.exception("Exception trying to get ns %s", e) self._ns = None if desc is not None: assert isinstance(desc, ServerDescriptor) self._desc = desc else: try: self._desc = cont.get_server_descriptor(fp, default=None) except (DescriptorUnavailable, ControllerError) as e: log.exception("Exception trying to get desc %s", e) self._consensus_timestamps = [] self._add_consensus_timestamp(timestamp) # The number of times that a relay is "prioritized" to be measured. # It is incremented in ``RelayPrioritizer.best_priority`` self.relay_recent_priority_list_count = 0 # The number of times that a relay has been queued to be measured. # It is incremented in ``scanner.main_loop`` self.relay_recent_measurement_attempt_count = 0 def _from_desc(self, attr): if not self._desc: return None return getattr(self._desc, attr, None) def _from_ns(self, attr): if not self._ns: return None return getattr(self._ns, attr, None) @property def nickname(self): return self._from_ns('nickname') @property def fingerprint(self): return self._from_ns('fingerprint') @property def flags(self): return self._from_ns('flags') @property def exit_policy(self): return self._from_desc('exit_policy') @property def average_bandwidth(self): return self._from_desc('average_bandwidth') @property def burst_bandwidth(self): return self._from_desc('burst_bandwidth') @property def observed_bandwidth(self): return self._from_desc('observed_bandwidth') @property def consensus_bandwidth(self): """Return the consensus bandwidth in Bytes. Consensus bandwidth is the only bandwidth value that is in kilobytes. """ if self._from_ns('bandwidth') is not None: return self._from_ns('bandwidth') * 1000 @property def consensus_bandwidth_is_unmeasured(self): # measured appears only votes, unmeasured appears in consensus # therefore is_unmeasured is needed to know whether the bandwidth # value in consensus is comming from bwauth measurements or not. return self._from_ns('is_unmeasured') @property def address(self): return self._from_ns('address') @property def master_key_ed25519(self): """Obtain ed25519 master key of the relay in server descriptors. :returns: str, the ed25519 master key base 64 encoded without trailing '='s. """ # Even if this key is called master-key-ed25519 in dir-spec.txt, # it seems that stem parses it as ed25519_master_key key = self._from_desc('ed25519_master_key') if key is None: return None return key.rstrip('=') @property def consensus_valid_after(self): """Obtain the consensus Valid-After from the document of this relay network status. """ network_status_document = self._from_ns('document') if network_status_document: return getattr(network_status_document, 'valid_after', None) return None @property def last_consensus_timestamp(self): if len(self._consensus_timestamps) >= 1: return self._consensus_timestamps[-1] return None def _add_consensus_timestamp(self, timestamp=None): """Add the consensus timestamp in which this relay is present. """ # It is possible to access to the relay's consensensus Valid-After if self.consensus_valid_after is not None: # The consensus timestamp list was initialized. if self.last_consensus_timestamp is not None: # Valid-After is more recent than the most recent stored # consensus timestamp. if self.consensus_valid_after > self.last_consensus_timestamp: # Add Valid-After self._consensus_timestamps.append( self.consensus_valid_after ) # The consensus timestamp list was not initialized. else: # Add Valid-After self._consensus_timestamps.append(self.consensus_valid_after) # If there was already a list the timestamp arg is more recent than # the most recent timestamp stored, elif (self.last_consensus_timestamp is not None and timestamp > self.last_consensus_timestamp): # Add the arg timestamp. self._consensus_timestamps.append(timestamp) # In any other case else: # Add the current datetime self._consensus_timestamps.append( datetime.utcnow().replace(microsecond=0)) def _remove_old_consensus_timestamps( self, measurements_period=MEASUREMENTS_PERIOD): self._consensus_timestamps = \ remove_old_consensus_timestamps( copy.deepcopy(self._consensus_timestamps), measurements_period )
[docs] def update_consensus_timestamps(self, timestamp=None): self._add_consensus_timestamp(timestamp) self._remove_old_consensus_timestamps()
@property def relay_in_recent_consensus_count(self): """Number of times the relay was in a conensus.""" return len(self._consensus_timestamps)
[docs] def can_exit_to_port(self, port): """ Returns True if the relay has an exit policy and the policy accepts exiting to the given portself or False otherwise. """ assert isinstance(port, int) # if dind't get the descriptor, there isn't exit policy # When the attribute is gotten in getattr(self._desc, "exit_policy"), # is possible that stem's _input_rules is None and raises an exception # (#29899): # File "/usr/lib/python3/dist-packages/sbws/lib/relaylist.py", line 117, in can_exit_to_port # noqa # if not self.exit_policy: # File "/usr/lib/python3/dist-packages/stem/exit_policy.py", line 512, in __len__ # noqa # return len(self._get_rules()) # File "/usr/lib/python3/dist-packages/stem/exit_policy.py", line 464, in _get_rules # noqa # for rule in decompressed_rules: # TypeError: 'NoneType' object is not iterable # Therefore, catch the exception here. try: if self.exit_policy: return self.exit_policy.can_exit_to(port=port) except TypeError: return False return False
[docs] def is_exit_not_bad_allowing_port(self, port): return (Flag.BADEXIT not in self.flags and Flag.EXIT in self.flags and self.can_exit_to_port(port))
[docs] def increment_relay_recent_measurement_attempt_count(self): """ Increment The number of times that a relay has been queued to be measured. It is call from :funf:`~sbws.core.scaner.main_loop`. """ # If it was not in the previous measurements version, start counting if self.relay_recent_measurement_attempt_count is None: self.relay_recent_measurement_attempt_count = 0 self.relay_recent_measurement_attempt_count += 1
[docs] def increment_relay_recent_priority_list_count(self): """ The number of times that a relay is "prioritized" to be measured. It is call from :meth:`~sbws.lib.relayprioritizer.RelayPrioritizer.best_priority`. """ # If it was not in the previous measurements version, start counting if self.relay_recent_priority_list_count is None: self.relay_recent_priority_list_count = 0 self.relay_recent_priority_list_count += 1
[docs]class RelayList: ''' Keeps a list of all relays in the current Tor network and updates it transparently in the background. Provides useful interfaces for getting only relays of a certain type. ''' def __init__(self, args, conf, controller, measurements_period=MEASUREMENTS_PERIOD, state=None): self._controller = controller self.rng = random.SystemRandom() self._refresh_lock = Lock() # To track all the consensus seen. self._consensus_timestamps = [] # Initialize so that there's no error trying to access to it. # In future refactor, change to a dictionary, where the keys are # the relays' fingerprint. self._relays = [] # The period of time for which the measurements are keep. self._measurements_period = measurements_period self._state = state # NOTE: blocking: writes to disk if self._state: if self._state.get('recent_measurement_attempt_count', None) \ is None: self._state['recent_measurement_attempt_count'] = 0 self._refresh() def _need_refresh(self): # New consensuses happen every hour. return datetime.utcnow() >= \ self.last_consensus_timestamp + timedelta(seconds=60*60) @property def last_consensus_timestamp(self): """Returns the datetime when the last consensus was obtained.""" if (getattr(self, "_consensus_timestamps") and self._consensus_timestamps): return self._consensus_timestamps[-1] # If the object was not created from __init__, it won't have # consensus_timestamps attribute or it might be empty. # In this case force new update. # Anytime more than 1h in the past will be old. self._consensus_timestamps = [] return datetime.utcnow() - timedelta(seconds=60*61) @property def relays(self): # See if we can get the list of relays without having to do a refresh, # which is expensive and blocks other threads if self._need_refresh(): log.debug('We need to refresh our list of relays. ' 'Going to wait for lock.') # Whelp we couldn't just get the list of relays because the list is # stale. Wait for the lock so we can refresh it. with self._refresh_lock: log.debug('We got the lock. Now to see if we still ' 'need to refresh.') # Now we have the lock ... but wait! Maybe someone else already # did the refreshing. So check if it still needs refreshing. If # not, we can do nothing. if self._need_refresh(): log.debug('Yup we need to refresh our relays. Doing so.') self._refresh() else: log.debug('No we don\'t need to refresh our relays. ' 'It was done by someone else.') log.debug('Giving back the lock for refreshing relays.') return self._relays @property def fast(self): return self._relays_with_flag(Flag.FAST) @property def exits(self): return self._relays_with_flag(Flag.EXIT) @property def bad_exits(self): return self._relays_with_flag(Flag.BADEXIT) @property def non_exits(self): return self._relays_without_flag(Flag.EXIT) @property def guards(self): return self._relays_with_flag(Flag.GUARD) @property def authorities(self): return self._relays_with_flag(Flag.AUTHORITY) @property def relays_fingerprints(self): # Using relays instead of _relays, so that the list get updated if # needed, since this method is used to know which fingerprints are in # the consensus. return [r.fingerprint for r in self.relays]
[docs] def random_relay(self): return self.rng.choice(self.relays)
def _relays_with_flag(self, flag): return [r for r in self.relays if flag in r.flags] def _relays_without_flag(self, flag): return [r for r in self.relays if flag not in r.flags] def _remove_old_consensus_timestamps(self): self._consensus_timestamps = remove_old_consensus_timestamps( copy.deepcopy(self._consensus_timestamps), self._measurements_period ) def _init_relays(self): """Returns a new list of relays that are in the current consensus. And update the consensus timestamp list with the current one. """ c = self._controller # This will get router statuses from this Tor cache, might not be # updated with the network. # Change to stem.descriptor.remote in future refactor. network_statuses = c.get_network_statuses() new_relays_dict = dict([(r.fingerprint, r) for r in network_statuses]) # Find the timestamp of the last consensus. timestamp = valid_after_from_network_statuses(network_statuses) self._consensus_timestamps.append(timestamp) self._remove_old_consensus_timestamps() # Update the relays that were in the previous consensus with the # new timestamp new_relays = [] relays = copy.deepcopy(self._relays) for r in relays: if r.fingerprint in new_relays_dict.keys(): r.update_consensus_timestamps(timestamp) new_relays_dict.pop(r.fingerprint) new_relays.append(r) # Add the relays that were not in the previous consensus # If there was an relay in some older previous consensus, # it won't get stored, so its previous consensuses are lost, # but probably this is fine for now to don't make it more complicated. for fp, ns in new_relays_dict.items(): r = Relay(ns.fingerprint, c, ns=ns, timestamp=timestamp) new_relays.append(r) return new_relays def _refresh(self): # Set a new list of relays. self._relays = self._init_relays() log.info("Number of consensuses obtained in the last %s days: %s.", int(self._measurements_period / 24 / 60 / 60), self.recent_consensus_count) # NOTE: blocking, writes to file! if self._state is not None: self._state['recent_consensus_count'] = self.recent_consensus_count @property def recent_consensus_count(self): """Number of times a new consensus was obtained.""" return len(self._consensus_timestamps)
[docs] def exits_not_bad_allowing_port(self, port): return [r for r in self.exits if r.is_exit_not_bad_allowing_port(port)]
[docs] def increment_recent_measurement_attempt_count(self): """ Increment the number of times that any relay has been queued to be measured. It is call from :funf:`~sbws.core.scaner.main_loop`. It is read and stored in a ``state`` file. """ # NOTE: blocking, writes to file! if self._state: self._state['recent_measurement_attempt_count'] += 1