From 2fce9af0ead43647651907ec781e734a83573f2a Mon Sep 17 00:00:00 2001 From: Igor Chubin Date: Fri, 5 Oct 2018 22:37:35 +0200 Subject: [PATCH] removed Limits class from srv.py --- lib/srv.py | 60 +++--------------------------------------------------- 1 file changed, 3 insertions(+), 57 deletions(-) diff --git a/lib/srv.py b/lib/srv.py index 9ec98d5..1d05b4b 100644 --- a/lib/srv.py +++ b/lib/srv.py @@ -12,7 +12,6 @@ import os import re import requests import socket -import time import json import geoip2.database @@ -41,8 +40,9 @@ from globals import GEOLITE, \ IATA_CODES_FILE, \ log, \ LISTEN_PORT, LISTEN_HOST, PLAIN_TEXT_AGENTS, PLAIN_TEXT_PAGES, \ - IP2LOCATION_KEY + IP2LOCATION_KEY, MY_EXTERNAL_IP +from limits import Limits from wttr import get_wetter, get_moon # pylint: enable=wrong-import-position,wrong-import-order @@ -57,61 +57,7 @@ MY_LOADER = jinja2.ChoiceLoader([ ]) APP.jinja_loader = MY_LOADER -class Limits: - def __init__(self): - self.intervals = ['min', 'hour', 'day'] - self.divisor = { - 'min': 60, - 'hour': 3600, - 'day': 86400, - } - self.counter = { - 'min': {}, - 'hour': {}, - 'day': {}, - } - self.limit = { - 'min': 30, - 'hour': 600, - 'day': 1000, - } - self.last_update = { - 'min': 0, - 'hour': 0, - 'day': 0, - } - self.clear_counters() - - def check_ip(self, ip_address): - """ - check if connections from `ip_address` are allowed - and raise a RuntimeError exception if they are not - """ - if ip_address == '5.9.243.177': - return - self.clear_counters() - for interval in self.intervals: - if ip_address not in self.counter[interval]: - self.counter[interval][ip_address] = 0 - self.counter[interval][ip_address] += 1 - if self.limit[interval] <= self.counter[interval][ip_address]: - log("Too many queries: %s in %s for %s" - % (self.limit[interval], interval, ip_address)) - raise RuntimeError( - "Not so fast! Number of queries per %s is limited to %s" - % (interval, self.limit[interval])) - - def clear_counters(self): - """ - Initialize counters for new interval - """ - t_int = int(time.time()) - for interval in self.intervals: - if t_int / self.divisor[interval] != self.last_update[interval]: - self.counter[interval] = {} - self.last_update[interval] = t_int / self.divisor[interval] - -LIMITS = Limits() +LIMITS = Limits(whitelist=[MY_EXTERNAL_IP], limits=(30, 60, 100)) def is_ip(ip_addr): """