Server IP : 162.0.209.157 / Your IP : 3.147.46.174 [ Web Server : LiteSpeed System : Linux premium178.web-hosting.com 4.18.0-513.24.1.lve.2.el8.x86_64 #1 SMP Fri May 24 12:42:50 UTC 2024 x86_64 User : balaoqob ( 2395) PHP Version : 8.0.30 Disable Function : NONE Domains : 1 Domains MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /opt/imunify360/venv/lib64/python3.11/site-packages/defence360agent/utils/ |
Upload File : |
class LineBuffer(object): """ Allows to accumulate data, and than iterate over it getting tokens split by line breaks '\n'. If at the end there is no line break, the data will sit in the line buffer until more data with line break comes in. """ def __init__(self): self.buf = "" def append(self, data): self.buf += data def __iter__(self): return self def __next__(self): pos = self.buf.find("\n") if pos != -1: result = self.buf[0:pos] self.buf = self.buf[pos + 1 :] return result raise StopIteration def clean(self): self.buf = "" class SizeBuffer: def __init__(self, size_len=2): self._buf = b"" self._size_len = size_len def append(self, data): self._buf += data def __iter__(self): return self def __next__(self): if not self._buf: raise StopIteration size = int.from_bytes(self._buf[: self._size_len], "big") if len(self._buf[self._size_len :]) >= size: data = self._buf[self._size_len : self._size_len + size] self._buf = self._buf[self._size_len + size :] return data raise StopIteration