Skip to content

Instantly share code, notes, and snippets.

@damp11113
Last active October 5, 2023 15:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save damp11113/f5207eb45a05f02f359da59c006aa5dc to your computer and use it in GitHub Desktop.
Save damp11113/f5207eb45a05f02f359da59c006aa5dc to your computer and use it in GitHub Desktop.
The first Prototype EAS (emergency alert system) in Thailand using SAME encode (Specific Area Message Encoding) With Python. For use with Radio or audio streaming.
# same encoder for thailand by damp11113
import datetime
import numpy as np
import pyaudio
import time
import soundfile as sf
from scipy import signal
import os
import pyttsx3
os.environ["damp11113_load_all_module"] = "NO"
os.environ["damp11113_check_update"] = "NO"
from damp11113.DSP import tonegen, FSKEncoderV3
import voicemeeter
from damp11113.processbar import LoadingProgress
from damp11113.file import removefile
"""
This docs made by damp11113
EAS Format (Thailand Version)
ZCZC-ORG-EEE-AGGSS000+TTTT-JJJHHMM-LLLLLLLL-
ZCZC = Start
ORG = Originator code EAS CIV WXR PEP
EEE = Event code (https://en.wikipedia.org/wiki/Specific_Area_Message_Encoding#:~:text=the%20EAS%27%20predecessor.-,Event%20codes,-%5Bedit%5D)
AAGPP000 = AA = Area (00 = All)
GG = Government (00 = All Government in Provincial)
PP = Provincial (05 = Phatthaya, 00 = All country)
000 = Country Code (If use in country use 000 for default)
TTTT = Purge time (HHMM) (0000 = Testing) HH = Hour
MM = Minute
JJJHHMM = JJJ = Julian day
HH = Hour
MM = Minute
LLLLLLLL = Station Callsign aka Sender
# more info https://en.wikipedia.org/wiki/Specific_Area_Message_Encoding
# country ref. http://www.statoids.com/uth.html http://www.statoids.com/ https://en.wikipedia.org/wiki/ISO_3166-2:TH
"""
Originator = "EAS" # EAS CIV WXR PEP
Event = "DMO"
# Location
Area = "00"
Government = "03"
Provincial = "83"
# Time
Purge = "0000"
Issue = datetime.datetime.now().strftime("%j%H%M")
Sender = ""
message = """
This is a test of the Emergency Alert System. This is only a test.
The broadcasters in your area, in voluntary cooperation with federal, state, and local authorities, have developed this system to keep you informed in the event of an emergency.
If this had been an actual emergency, an official message would have followed the alert tone.
This concludes this test of the Emergency Alert System.
"""
# ------------------------------------------------------------------------------------------
vmr = voicemeeter.remote('banana')
vmr.login()
progress = LoadingProgress(8, desc="EAS Encoder", enabuinstatus=False, clearline=False)
progress.start()
progress.status = "Encoding Message..."
#filename = r"C:\Users\sansw\Desktop\EAS test.wav"
filename = "message.wav"
engine = pyttsx3.init()
engine.setProperty('volume', 1) # Volume 0-1
engine.setProperty('rate', 150) #148
#engine.setProperty('voice', "HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Speech\Voices\Tokens\TTS_THAI") # thai lang
engine.save_to_file(message, filename)
engine.runAndWait()
with sf.SoundFile(filename, 'r') as file:
file_sample_rate = file.samplerate
audio_data = file.read()
resampled_audio_data = signal.resample(audio_data, int(len(audio_data) * 48000 / file_sample_rate))
sameCompatibleTimestamp = datetime.datetime.now().strftime("%j%H%M")
removefile(filename)
EAScode = f"ZCZC-{Originator}-{Event}-{Area}{Government}{Provincial}+{Purge}-{Issue}-{Sender}"
def preamble(samplerate=48000, baudrate=100, tone1=1000, tone2=2000):
t = 1.0 / baudrate
samples_per_bit = int(t * samplerate)
byte_data = np.zeros(0)
bitlist = [1, 1, 0, 1, 0, 1, 0, 1]
for _ in range(0, 16):
for bit in bitlist:
if bit:
roffle = np.sin(2 * np.pi * tone2 * np.arange(samples_per_bit) / samplerate)
byte_data = np.append(byte_data, roffle * 0.8)
else:
sinewave = np.sin(2 * np.pi * tone1 * np.arange(samples_per_bit) / samplerate)
byte_data = np.append(byte_data, sinewave)
return byte_data
EASpreamble = preamble(baudrate=520+(5/6), tone1=1562.5, tone2=2083+(1/3))
AFSKdata = FSKEncoderV3(EAScode, baudrate=520+(5/6), tone1=1562.5, tone2=2083+(1/3))
AFSKdataend = FSKEncoderV3("NNNN", baudrate=520+(5/6), tone1=1562.5, tone2=2083+(1/3))
altone853 = tonegen(853, 5)
altone960 = tonegen(960, 5)
altone = ((altone853 + altone960) * 0.5).astype(np.float32).tobytes()
header = np.concatenate((EASpreamble, AFSKdata))
tail = np.concatenate((EASpreamble, AFSKdataend))
progress.update(1)
progress.status = "Starting EAS Audio Device..."
p = pyaudio.PyAudio()
stream = p.open(format=pyaudio.paFloat32, channels=1, rate=48000, output=True, output_device_index=20)
progress.update(1)
progress.status = "Mute Main Audio"
vmr.inputs[3].gain = -60
time.sleep(1)
progress.update(1)
progress.status = "Sending Header..."
for i in range(3):
stream.write(header.astype(np.float32).tobytes())
time.sleep(1)
progress.update(1)
progress.status = "Sending Tone..."
stream.write(altone)
time.sleep(1)
progress.update(1)
progress.status = "Sending Messages..."
stream.write(resampled_audio_data.astype(np.float32).tobytes())
time.sleep(1)
progress.update(1)
progress.status = "Sending EOM..."
for i in range(3):
stream.write(tail.astype(np.float32).tobytes())
time.sleep(1)
progress.update(1)
progress.status = "Stopping EAS Audio Device..."
stream.stop_stream()
stream.close()
p.terminate()
progress.update(1)
progress.status = "UnMute Main Audio..."
vmr.inputs[3].gain = 0
time.sleep(1)
progress.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment