Skip to content

Instantly share code, notes, and snippets.

@Mahyar24
Last active April 17, 2024 20:22
Show Gist options
  • Save Mahyar24/ff11ed7973bbe3a37b1caedef40ff850 to your computer and use it in GitHub Desktop.
Save Mahyar24/ff11ed7973bbe3a37b1caedef40ff850 to your computer and use it in GitHub Desktop.
Checking Abnormal V2Ray Users (Polars)
#! /usr/bin/env python3.10
"""
This module is used for checking excessive use by V2Ray users.
Install Polars beforehand via: `pip install polars`
Sample Command:
$ journalctl -u v2ray --since "72 h ago" | grep email | cut -d " " -f 6,7,8,13 | python abnormal.py -
GitHub: https://github.com/Mahyar24/V2Conf
Mahyar@Mahyar24.com, Mon 28 Nov 2022
"""
import io
import ipaddress
import sys
import warnings
from functools import cache
import polars as pl
warnings.filterwarnings("ignore")
@cache
def ip_to_network(ip: str, subnet: int = 16) -> str:
if ipaddress.ip_address(ip).version == 6:
subnet = 48
return str(ipaddress.ip_network(f"{ip}/{subnet}", strict=False).network_address)
def make_df(input_data, subnet: int = 16) -> pl.DataFrame:
return (
pl.read_csv(
input_data,
separator=" ",
has_header=False,
new_columns=["date", "time", "ip", "username"],
)
.lazy()
.select(
pl.concat_str([pl.col("date"), pl.lit(" "), pl.col("time")])
.alias("dt")
.str.strptime(pl.Datetime, "%Y/%m/%d %H:%M:%S"),
pl.col("username")
.str.split("@")
.list.first()
.cast(pl.Categorical)
.alias("user"),
pl.col("ip")
.str.extract(r"(.*):[^:]*$")
.str.strip_chars_start("tcp:")
.str.strip_chars_start("udp:")
.str.strip_chars("[]")
.map_elements(
lambda x: ip_to_network(x, subnet=subnet),
return_dtype=pl.String,
),
)
.set_sorted(pl.col("dt"))
)
def analyze_df(df: pl.DataFrame, conn: int = 2) -> pl.DataFrame:
conn *= 60
return (
(
df.group_by_dynamic(
"dt", every="30s", group_by=pl.col("user"), label="left"
)
.agg(pl.col("ip").n_unique())
.group_by_dynamic("dt", every="30m", group_by=pl.col("user"), label="left")
.agg(pl.col("ip").sum())
.filter(pl.col("ip") > conn)
.group_by(pl.col("user"))
.agg(
[
pl.col("ip").count().alias("No"),
pl.col("ip").mean().sub(conn).truediv(conn).alias("Excessing"),
]
)
.sort(pl.col("No"), pl.col("Excessing"), descending=True)
)
.collect()
)
def main() -> None:
print("Notable Users:\n")
print(result := analyze_df(make_df(io.StringIO(sys.stdin.read()), subnet=16)))
print("---------------------------")
result = result.filter((pl.col("No") >= 5) & (pl.col("Excessing") >= 0.2))
if result.is_empty():
print("Abnormal Users: None!")
else:
print(f"Abnormal Users:\n\n{result}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment