Server IP : 162.0.209.157 / Your IP : 18.117.254.177 [ Web Server : LiteSpeed System : Linux premium178.web-hosting.com 4.18.0-513.24.1.lve.2.el8.x86_64 #1 SMP Fri May 24 12:42:50 UTC 2024 x86_64 User : balaoqob ( 2395) PHP Version : 8.0.30 Disable Function : NONE Domains : 1 Domains MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /lib64/nagios/plugins/nccustom/ |
Upload File : |
#!/usr/libexec/platform-python ## Created by Andrii Ivanov ## Namecheap ## ## We need to monitor the state of TCP connections from the public IP interface on our servers. ## Check status based on successful TCP probes to destination address: port. ## ## ## Icinga Status ## CRITICAL - counter is equal to 0 successful TCP probes. ## WARNING - counter is less {--checks_count} and more then 0 successful TCP probes ## OK - counter is equal to {--checks_count} successful TCP probes ## import argparse import socket import threading import re import os from time import sleep, time # Arguments parser parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument("-a", "--dst_address", default="google.com", help="Destination address") parser.add_argument("-p", "--dst_port", default="80", help="Destination port") parser.add_argument("-t", "--sock_timeout", default="5", help="Socket connection timeout") parser.add_argument("-c", "--checks_count", default="3", help="Connection attempt count") parser.add_argument("-v", "--verbose", default=False, action="store_true", help="Extra output") parser.add_argument("-d", "--dynamic_src_port", default=True, action="store_false", help="Dynamic src port for probes") parser.add_argument("-l", "--include_private_ip", default=False, action="store_true", help="Include private IP") parser.add_argument("-x", "--exclude_ips", nargs="+", help="Exclude IPs") args = parser.parse_args() # Variables DST_ADDRESS = args.dst_address DST_PORT = int(args.dst_port) SOCK_TIMEOUT = int(args.sock_timeout) CHECKS_COUNT = int(args.checks_count) EXCLUDE_IPS = args.exclude_ips PRIVATE_REGEX = re.compile('(^127\.)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)') GLOBAL_REGEX = re.compile('(\d+)(?<!10)\.(\d+)(?<!192\.168)(?<!172\.(1[6-9]|2\d|3[0-1]))\.(\d+)\.(\d+)') THREADS = [] MAX_THREADS = 10 REPORT = {} CRITICAL = "" WARNING = "" TOTAL = "" def get_source_ip(): """a function for get set of local IP addresses""" addresses = [] IP_LIST = os.popen('hostname -I') IPS = IP_LIST.read().strip().split(" ") for IP in IPS: if GLOBAL_REGEX.match(IP): addresses.append(IP) if args.include_private_ip: if PRIVATE_REGEX.match(IP): addresses.append(IP) if args.exclude_ips: for E_IP in EXCLUDE_IPS: if re.compile("^"+E_IP).match(IP): addresses.remove(IP) return addresses def check_tcp_connection(SRC_IP): """a function for TCP check the connection and generate the report""" SRC_PORT = 0 COUNTER = 0 for _ in range(CHECKS_COUNT): try: if args.verbose: start_time = time() client_socket = socket.socket() client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) client_socket.settimeout(SOCK_TIMEOUT) client_socket.bind((SRC_IP, SRC_PORT)) if args.dynamic_src_port: SRC_PORT = int(client_socket.getsockname()[1]) # minimize ports usage result = client_socket.connect_ex((DST_ADDRESS, DST_PORT)) client_socket.close() if args.verbose: end_time = time() exec_time = end_time - start_time if result == 0: COUNTER += 1 if args.verbose: print ("%s:%s open counter=%d time=%f" % (SRC_IP, SRC_PORT, COUNTER, exec_time)) else: if args.verbose: print ("%s:%s closed counter=%d time=%f" % (SRC_IP, SRC_PORT, COUNTER, exec_time)) pass sleep(0.1) except socket.error as exc: print("%s: %s. Is destination address correct?" % (SRC_IP,exc)) pass if COUNTER == CHECKS_COUNT: REPORT[SRC_IP] = "OK" elif COUNTER < CHECKS_COUNT and COUNTER > 0: REPORT[SRC_IP] = "WARNING" elif COUNTER == 0: REPORT[SRC_IP] = "CRITICAL" # multithreading for SRC_IP in get_source_ip(): THREAD = threading.Thread(target=check_tcp_connection, args=[SRC_IP]) THREADS.append(THREAD) # threads start for THREAD in THREADS: THREAD.start() while threading.active_count() > MAX_THREADS: sleep(0.5) # THREAD waiting for THREAD in THREADS: THREAD.join() if args.verbose: print(" ") if args.verbose: print(REPORT) if args.verbose: print(" ") # generating Icinga check result output for key, value in REPORT.items(): if value == "CRITICAL": CRITICAL = CRITICAL + key + " " elif value == "WARNING": WARNING = WARNING + key + " " if CRITICAL: TOTAL = "CRITICAL - " + CRITICAL if WARNING: TOTAL = TOTAL + "WARNING - " + WARNING if TOTAL: print(TOTAL) else: print("OK")