Last active
October 5, 2023 15:49
-
-
Save damp11113/f5207eb45a05f02f359da59c006aa5dc to your computer and use it in GitHub Desktop.
The first Prototype EAS (emergency alert system) in Thailand using SAME encode (Specific Area Message Encoding) With Python. For use with Radio or audio streaming.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# same encoder for thailand by damp11113 | |
import datetime | |
import numpy as np | |
import pyaudio | |
import time | |
import soundfile as sf | |
from scipy import signal | |
import os | |
import pyttsx3 | |
os.environ["damp11113_load_all_module"] = "NO" | |
os.environ["damp11113_check_update"] = "NO" | |
from damp11113.DSP import tonegen, FSKEncoderV3 | |
import voicemeeter | |
from damp11113.processbar import LoadingProgress | |
from damp11113.file import removefile | |
""" | |
This docs made by damp11113 | |
EAS Format (Thailand Version) | |
ZCZC-ORG-EEE-AGGSS000+TTTT-JJJHHMM-LLLLLLLL- | |
ZCZC = Start | |
ORG = Originator code EAS CIV WXR PEP | |
EEE = Event code (https://en.wikipedia.org/wiki/Specific_Area_Message_Encoding#:~:text=the%20EAS%27%20predecessor.-,Event%20codes,-%5Bedit%5D) | |
AAGPP000 = AA = Area (00 = All) | |
GG = Government (00 = All Government in Provincial) | |
PP = Provincial (05 = Phatthaya, 00 = All country) | |
000 = Country Code (If use in country use 000 for default) | |
TTTT = Purge time (HHMM) (0000 = Testing) HH = Hour | |
MM = Minute | |
JJJHHMM = JJJ = Julian day | |
HH = Hour | |
MM = Minute | |
LLLLLLLL = Station Callsign aka Sender | |
# more info https://en.wikipedia.org/wiki/Specific_Area_Message_Encoding | |
# country ref. http://www.statoids.com/uth.html http://www.statoids.com/ https://en.wikipedia.org/wiki/ISO_3166-2:TH | |
""" | |
Originator = "EAS" # EAS CIV WXR PEP | |
Event = "DMO" | |
# Location | |
Area = "00" | |
Government = "03" | |
Provincial = "83" | |
# Time | |
Purge = "0000" | |
Issue = datetime.datetime.now().strftime("%j%H%M") | |
Sender = "" | |
message = """ | |
This is a test of the Emergency Alert System. This is only a test. | |
The broadcasters in your area, in voluntary cooperation with federal, state, and local authorities, have developed this system to keep you informed in the event of an emergency. | |
If this had been an actual emergency, an official message would have followed the alert tone. | |
This concludes this test of the Emergency Alert System. | |
""" | |
# ------------------------------------------------------------------------------------------ | |
vmr = voicemeeter.remote('banana') | |
vmr.login() | |
progress = LoadingProgress(8, desc="EAS Encoder", enabuinstatus=False, clearline=False) | |
progress.start() | |
progress.status = "Encoding Message..." | |
#filename = r"C:\Users\sansw\Desktop\EAS test.wav" | |
filename = "message.wav" | |
engine = pyttsx3.init() | |
engine.setProperty('volume', 1) # Volume 0-1 | |
engine.setProperty('rate', 150) #148 | |
#engine.setProperty('voice', "HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Speech\Voices\Tokens\TTS_THAI") # thai lang | |
engine.save_to_file(message, filename) | |
engine.runAndWait() | |
with sf.SoundFile(filename, 'r') as file: | |
file_sample_rate = file.samplerate | |
audio_data = file.read() | |
resampled_audio_data = signal.resample(audio_data, int(len(audio_data) * 48000 / file_sample_rate)) | |
sameCompatibleTimestamp = datetime.datetime.now().strftime("%j%H%M") | |
removefile(filename) | |
EAScode = f"ZCZC-{Originator}-{Event}-{Area}{Government}{Provincial}+{Purge}-{Issue}-{Sender}" | |
def preamble(samplerate=48000, baudrate=100, tone1=1000, tone2=2000): | |
t = 1.0 / baudrate | |
samples_per_bit = int(t * samplerate) | |
byte_data = np.zeros(0) | |
bitlist = [1, 1, 0, 1, 0, 1, 0, 1] | |
for _ in range(0, 16): | |
for bit in bitlist: | |
if bit: | |
roffle = np.sin(2 * np.pi * tone2 * np.arange(samples_per_bit) / samplerate) | |
byte_data = np.append(byte_data, roffle * 0.8) | |
else: | |
sinewave = np.sin(2 * np.pi * tone1 * np.arange(samples_per_bit) / samplerate) | |
byte_data = np.append(byte_data, sinewave) | |
return byte_data | |
EASpreamble = preamble(baudrate=520+(5/6), tone1=1562.5, tone2=2083+(1/3)) | |
AFSKdata = FSKEncoderV3(EAScode, baudrate=520+(5/6), tone1=1562.5, tone2=2083+(1/3)) | |
AFSKdataend = FSKEncoderV3("NNNN", baudrate=520+(5/6), tone1=1562.5, tone2=2083+(1/3)) | |
altone853 = tonegen(853, 5) | |
altone960 = tonegen(960, 5) | |
altone = ((altone853 + altone960) * 0.5).astype(np.float32).tobytes() | |
header = np.concatenate((EASpreamble, AFSKdata)) | |
tail = np.concatenate((EASpreamble, AFSKdataend)) | |
progress.update(1) | |
progress.status = "Starting EAS Audio Device..." | |
p = pyaudio.PyAudio() | |
stream = p.open(format=pyaudio.paFloat32, channels=1, rate=48000, output=True, output_device_index=20) | |
progress.update(1) | |
progress.status = "Mute Main Audio" | |
vmr.inputs[3].gain = -60 | |
time.sleep(1) | |
progress.update(1) | |
progress.status = "Sending Header..." | |
for i in range(3): | |
stream.write(header.astype(np.float32).tobytes()) | |
time.sleep(1) | |
progress.update(1) | |
progress.status = "Sending Tone..." | |
stream.write(altone) | |
time.sleep(1) | |
progress.update(1) | |
progress.status = "Sending Messages..." | |
stream.write(resampled_audio_data.astype(np.float32).tobytes()) | |
time.sleep(1) | |
progress.update(1) | |
progress.status = "Sending EOM..." | |
for i in range(3): | |
stream.write(tail.astype(np.float32).tobytes()) | |
time.sleep(1) | |
progress.update(1) | |
progress.status = "Stopping EAS Audio Device..." | |
stream.stop_stream() | |
stream.close() | |
p.terminate() | |
progress.update(1) | |
progress.status = "UnMute Main Audio..." | |
vmr.inputs[3].gain = 0 | |
time.sleep(1) | |
progress.stop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment