Last active
January 26, 2024 21:55
-
-
Save Skimige/ea9d2520ed730a66cf979be32cceae04 to your computer and use it in GitHub Desktop.
Scan IPTV channels
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# iptvscan | |
# Script for scanning and saving IPTV playlist. | |
# Python v.3 required for using. https://www.python.org/downloads/ | |
# Author: joddude <joddude@gmail.com> | |
# Disclaimer: | |
# This script is free and provided "as is" without any warranty. | |
# You can use it at your own risk. | |
# The author assumes no responsibility for any moral or material damage caused | |
# by the use of this software, any loss of profit as a result of or during use. | |
# ------------------------------------------------------------------------------ | |
protocol = 'udp' | |
ip_start = '239.93.0.0' | |
ip_end = '239.93.1.255' | |
# ports = [5140] | |
ports = [5140, 6000, 2191, 9008, 2193, 2194, 2192, 2223, 2224, 2226, 2225, 8124, 8004, 4120, 1239, 1238, 1250, 1235, 1251, 1252, 9020, 9024, 2101, 9136, 1292, 2208, 2104, 2201, 9012, 9000, 8012, 9040, 9044, 9048, 9052, 9068, 9072, 9076, 8028, 9148, 9132, 9208, 1281, 1282, 1283, 1284, 8024, 8302, 8138, 1234, 2178, 8224] | |
timeout = 1 # seconds | |
random_search = False # False or True | |
# ------------------------------------------------------------------------------ | |
import socket | |
import struct | |
import sys | |
import time | |
import datetime | |
from random import shuffle | |
# ------------------------------------------------------------------------------ | |
def main(): | |
ip_list = ip_range(ip_start, ip_end) | |
if random_search: | |
shuffle(ip_list) | |
playlist_name = 'IPTV-' + datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S') + '.m3u' | |
found_channels = 0 | |
print(f'IP from {ip_start} to {ip_end} ({len(ip_list)} IPs)') | |
print('Playlist name:', playlist_name) | |
with open(playlist_name, 'w') as file: | |
print('#EXTM3U', file=file) | |
print('', file=file) | |
update_progress(0, f'Scan {ip_start}') | |
for port in ports: | |
n = len(ip_list) | |
for counter, ip in enumerate(ip_list[:], start=1): | |
if iptv_test(ip, port, timeout): | |
ip_list.remove(ip) | |
print(f'#EXTINF:-1,{ip}', file=file) | |
print(f'{protocol}://{ip}:{port}', file=file) | |
# print('', file=file) | |
found_channels += 1 | |
time.sleep(1) | |
update_progress(counter / n, f'Scan {ip}:{port}', f'(Found {found_channels:>3} channels)') | |
print(f'Found {found_channels} channels') | |
# ------------------------------------------------------------------------------ | |
def iptv_test(ip, port, timeout=1): | |
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) | |
sock.settimeout(timeout) | |
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
sock.bind(('', port)) | |
mreq = struct.pack('4sl', socket.inet_aton(ip), socket.INADDR_ANY) | |
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) | |
try: | |
if sock.recv(10240): | |
return True | |
else: | |
return False | |
except socket.timeout: | |
return False | |
# ------------------------------------------------------------------------------ | |
def ip_range(start_ip, end_ip): | |
start = list(map(int, start_ip.split('.'))) | |
end = list(map(int, end_ip.split('.'))) | |
temp = start | |
ip_range = [] | |
ip_range.append(start_ip) | |
while temp != end: | |
start[3] += 1 | |
for i in (3, 2, 1): | |
if temp[i] == 256: | |
temp[i] = 0 | |
temp[i-1] += 1 | |
ip_range.append('.'.join(map(str, temp))) | |
return ip_range | |
# ------------------------------------------------------------------------------ | |
def update_progress(progress, title='Progress', status=''): | |
barLength = 50 | |
if isinstance(progress, int): | |
progress = float(progress) | |
if not isinstance(progress, float): | |
progress = 0 | |
status = "error: progress var must be float\r\n" | |
if progress < 0: | |
progress = 0 | |
status = "Halt"+" "*21+"\r\n" | |
if progress >= 1: | |
progress = 1 | |
status = "Done"+" "*21+"\r\n" | |
block = int(round(barLength*progress)) | |
text = '\r' + title + ':\t[{0}] {1:>3}% {2}'.format('#' * block + '-' * (barLength - block), round(progress * 100), status) | |
sys.stdout.write(text) | |
sys.stdout.flush() | |
# ------------------------------------------------------------------------------ | |
if __name__ == '__main__': | |
try: | |
print('IPTV scan started. Press Ctrl+C to stop.') | |
main() | |
except KeyboardInterrupt: | |
print() | |
print('You pressed Ctrl+C. Stop') | |
sys.exit() | |
except: | |
import sys | |
print(sys.exc_info()[0]) | |
import traceback | |
print(traceback.format_exc()) | |
finally: | |
print('IPTV scan finished. Press Enter to exit ...') | |
input() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment