Server IP : 162.0.209.157 / Your IP : 18.118.142.101 [ Web Server : LiteSpeed System : Linux premium178.web-hosting.com 4.18.0-513.24.1.lve.2.el8.x86_64 #1 SMP Fri May 24 12:42:50 UTC 2024 x86_64 User : balaoqob ( 2395) PHP Version : 8.0.30 Disable Function : NONE Domains : 1 Domains MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /opt/imunify360/venv/lib64/python3.11/site-packages/defence360agent/api/ |
Upload File : |
"""This module implement inactivity tracker for ImunifyAV to automaticaly shutdown the process when it is idle for certain time (no RPC calls and long running tasks). """ import time from contextlib import contextmanager, suppress from logging import getLogger logger = getLogger(__name__) class InactivityTracker: def __init__(self): self._last_action_timestamp = time.monotonic() self._long_action_counter = 0 self._long_actions_list = [] self._timeout = 0 def __str__(self): return "Time from last action is {}, long actions {}".format( time.monotonic() - self._last_action_timestamp, self._long_actions_list, ) @contextmanager def task(self, name): self.start(name) try: yield finally: self.stop(name) def reset_timer(self): self._last_action_timestamp = time.monotonic() def start(self, name): self._long_action_counter += 1 self._long_actions_list.append(name) self.reset_timer() def stop(self, name): self._long_action_counter -= 1 with suppress(ValueError): self._long_actions_list.remove(name) self.reset_timer() def is_timeout(self): return (not self._long_action_counter) and ( self._last_action_timestamp + self._timeout <= time.monotonic() ) def set_timeout(self, timeout: int) -> None: self._timeout = timeout track = InactivityTracker()