Skip to content

Instantly share code, notes, and snippets.

@shakir915
Last active June 21, 2024 08:16
Show Gist options
  • Save shakir915/7ca31774a5bb5c54f89560945112710f to your computer and use it in GitHub Desktop.
Save shakir915/7ca31774a5bb5c54f89560945112710f to your computer and use it in GitHub Desktop.
YahooSocketFetcher
package shakir.bhav.common
import com.google.common.primitives.Ints
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import okhttp3.*
import proto.My // proto file : https://gist.github.com/shakir915/f2f078058183a5beb4a853d7d6529de4
import java.io.File
import java.io.FileOutputStream
import java.text.SimpleDateFormat
import java.util.*
import java.util.concurrent.TimeUnit
import kotlin.collections.ArrayList
import kotlin.concurrent.thread
object YahooSocketFetcher {
var saveRaw = true
val rawlist1 = arrayListOf<ByteArray>()
val rawlist2 = arrayListOf<ByteArray>()
var rawAddList = rawlist1
var rawSaveList = rawlist1
val appendFileOnArraySize by lazy {
settings.appendFileOnArraySize
}
fun kill() {
println("YahooSocketFetcher : kill() called")
socketEnabled = false
try {
webSocket?.close(1000, "")
} catch (e: Exception) {
e.printStackTrace()
}
}
fun addSymbol(list_i: List<String>) {
println("YahooSocketFetcher : addSymbol() called with: list_i = $list_i")
if (!list.containsAll(list_i)) {
list.addAll(list_i)
list.distinct()?.let {
list.clear()
list.addAll(it)
}
sendPendingForAddNewSymbol = true
}
}
var inisiationOnProgress = false
var sendPendingForAddNewSymbol = false
var functions: ArrayList<((Triple<String, Double, Long>) -> Unit)> = arrayListOf()
fun start(list: List<String>, function: (Triple<String, Double, Long>) -> Unit) {
println("YahooSocketFetcher : start() called with: list = $list, function = $function")
socketEnabled = true
this.functions.add(function)
addSymbol(list)
if (webSocket != null && System.currentTimeMillis() - lastOnMessageWithPriceTriggeredAtMilli <= 10000) {
sendPendingForAddNewSymbol = true
} else {
restart(100)
}
}
fun restart(delay: Long = 3000L) {
println("YahooSocketFetcher : restart() called with: delay = $delay")
if (!inisiationOnProgress) {
inisiationOnProgress = true
try {
webSocket?.close(1000, "")
} catch (e: Exception) {
}
Thread.sleep(delay)
if (socketEnabled) {
try {
val client = OkHttpClient.Builder()
.readTimeout(0, TimeUnit.MILLISECONDS)
.build()
val request = Request.Builder()
.url("wss://streamer.finance.yahoo.com/")
.build()
webSocket = client.newWebSocket(request, webSocketListener)
} catch (e: Exception) {
e.printStackTrace()
}
}
inisiationOnProgress = false
}
}
var socketEnabled = true
var webSocket: WebSocket? = null
var list: ArrayList<String> = arrayListOf()
var lastOnMessageWithPriceTriggeredAtMilli = 0L
val webSocketListener = object : WebSocketListener() {
override fun onMessage(webSocket: WebSocket, text: String) {
super.onMessage(webSocket, text)
try {
//println("webSocket onMessage $webSocket $text ")
val ba = Base64.getDecoder().decode(text)
My.PricingData.parseFrom(ba)?.let {
lastOnMessageWithPriceTriggeredAtMilli = System.currentTimeMillis()
//println("${it.id} ${it.shortName} ${it.underlyingSymbol} ${it.price} ${it.dayVolume} ${milliDisplay(it.time)}")
functions.forEach { function ->
val triple = Triple(it.id.replace(".NS", ""), it.price.toDouble(), it.time)
IndicatorCheck.check(triple)
function?.invoke(triple)
}
//println("proto ${it.underlyingSymbol} ${milliFileDateTime(it.time)} ${it.shortName} ${it.ask} ${it.bid} ${it.changePercent} ${it.currency} ${it.dayVolume}")
}
saveFile(ba)
} catch (e: Exception) {
e.printStackTrace()
}
if (sendPendingForAddNewSymbol) {
sendPendingForAddNewSymbol = false
send()
}
}
override fun onOpen(webSocket: WebSocket, response: Response) {
super.onOpen(webSocket, response)
println("webSocket onOpen ${YahooSocketFetcher.webSocket} $response ")
send()
}
fun send() {
try {
var s = "{\"subscribe\":[\""
var sInetialLength = s.length
list.filter { it.isNotBlank() == true }.forEach {
if (s.length > sInetialLength) {
s += "\",\"${it}.NS"
} else {
s += "${it}.NS"
}
}
s += "\"]}"
webSocket?.send(s)
} catch (e: Exception) {
e.printStackTrace()
}
}
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
super.onClosed(webSocket, code, reason)
println("webSocket onClosed $webSocket $code $reason")
restart()
}
override fun onClosing(
webSocket: WebSocket,
code: Int,
reason: String,
) {
super.onClosing(webSocket, code, reason)
println("webSocket onClosing $webSocket $code $reason")
restart()
}
override fun onFailure(
webSocket: WebSocket,
t: Throwable,
response: Response?,
) {
super.onFailure(webSocket, t, response)
println("webSocket onFailure $webSocket $t $response")
restart()
}
override fun onMessage(webSocket: WebSocket, bytes: okio.ByteString) {
super.onMessage(webSocket, bytes)
//println("webSocket onMessage $webSocket $bytes")
}
}
fun saveFile(ba: ByteArray?, function: (() -> Unit)? = null) {
if (saveRaw) {
if (ba != null)
rawAddList.add(ba)
//println("rawAddList ${rawAddList.size}")
if (ba == null || rawAddList.size == appendFileOnArraySize) {
rawSaveList = rawAddList
thread {
getDataFolder("YahooSocketBinaryPartFiles", milliFileDateTime(System.currentTimeMillis())).apply {
rawSaveList.forEach {
appendBytes(Ints.toByteArray(it.size))
appendBytes(it)
}
rawSaveList.clear()
}
function?.invoke()
}
if (rawAddList == rawlist1) {
rawAddList = rawlist2
} else {
rawAddList = rawlist1
}
}
}
}
fun partFileToSingleFileAndUploadToTG(logBack: ((s: String?) -> Unit)? = null) {
kill()
saveFile(null) {
val dates = getDataFolder("YahooSocketBinaryPartFiles").list().map { it.split("__")?.getOrNull(0) }.filterNotNull().distinct()
dates.forEach { date ->
val files = getDataFolder("YahooSocketBinaryPartFiles").listFiles().filter { it.name.startsWith(date) }.toTypedArray()
getDataFolder("YahooSocketBinary", "YahooSocketBinary__" + date).appendAll(
files = files.sortedBy { it.name }.toTypedArray()
)
files.forEach {
it.delete()
}
}
GlobalScope.launch {
TGUpload.upload(dates.map { getDataFolder("YahooSocketBinary", "YahooSocketBinary__" + it) })
logBack?.invoke("DONE")
}
}
}
fun File.appendAll(bufferSize: Int = 4096, vararg files: File) {
// if (!exists()) {
// throw NoSuchFileException(this, null, "File doesn't exist.")
// }
require(!isDirectory) { "The file is a directory." }
FileOutputStream(this, true).use { output ->
for (file in files) {
if (file.isDirectory || !file.exists()) {
continue // Might want to log or throw
}
file.forEachBlock(bufferSize) { buffer, bytesRead -> output.write(buffer, 0, bytesRead) }
}
}
}
/*
fun process(text: String) {
val f1l = ba[1]
var f1v = String(Arrays.copyOfRange(ba, 2, 2 + f1l))
if (f1v.contains(".NS") || true) {
f1v = f1v.replace(".NS", "")
val f2v = ByteBuffer.wrap(Arrays.copyOfRange(ba, 3 + f1l, 7 + f1l)).order(ByteOrder.LITTLE_ENDIAN).getFloat()
println("$f1v $f2v")
}
}*/
fun testSaved() {
IndicatorCheck.pushEnabled=false
getDataFolder("YahooSocketBinary").listFiles().forEach { file ->
val ci = Calendar.getInstance().apply {
timeInMillis = SimpleDateFormat("yyyy_MM_dd", Locale.ENGLISH)
.parse(file.name.replace("YahooSocketBinary__", "")).time
set(Calendar.HOUR_OF_DAY, 5)
set(Calendar.MINUTE, 0)
set(Calendar.SECOND, 0)
set(Calendar.MILLISECOND, 0)
}
val ohlcp = getOHLCPivotList(ci)
IndicatorCheck.restart(ohlcp)
val sizeByteArray = ByteArray(4)
val arraylist= arrayListOf<Triple<String,Double,Long>>()
file.inputStream().buffered().use { input ->
while (true) {
var b = input.read(sizeByteArray)
if (b <= 0) break
val valueByteArray = ByteArray(Ints.fromByteArray(sizeByteArray))
b = input.read(valueByteArray)
if (b <= 0) break
My.PricingData.parseFrom(valueByteArray)?.let {
arraylist.add(Triple(it.id.replace(".NS", ""), it.price.toDouble(), it.time))
// println("PricingData ${ }.")
}
}
}
arraylist.sortedBy { it.third }.forEach {
IndicatorCheck.check(it)
}
}
IndicatorCheck.pushEnabled=true
}
fun addToMinuteCandle(pb: Triple<String, Double, Long>, o: MutableList<Float>, h: MutableList<Float>, l: MutableList<Float>, c: MutableList<Float>, v: ArrayList<Long>, t: ArrayList<Long>) {
val seconds_nearest_minute = Calendar.getInstance().apply {
timeInMillis = pb.third
set(Calendar.SECOND, 0)
set(Calendar.MILLISECOND, 0)
}.timeInMillis.div(1000)
val i = t.indexOfFirst { it == seconds_nearest_minute }
if (t.getOrNull(i) != null) {
if (pb.second > h[i])
h[i] = pb.second.toFloat()
else if (pb.second < l[i])
l[i] = pb.second.toFloat()
c[i] = pb.second.toFloat()
} else {
t.add(seconds_nearest_minute)
h.add(pb.second.toFloat())
o.add(pb.second.toFloat())
l.add(pb.second.toFloat())
c.add(pb.second.toFloat())
v.add(0)
}
}
fun addToDayCandle(pb: Triple<String, Double, Long>, o: MutableList<Float>, h: MutableList<Float>, l: MutableList<Float>, c: MutableList<Float>, v: ArrayList<Long>, t: ArrayList<Long>) {
//todo
val seconds_nearest_minute = Calendar.getInstance().apply {
timeInMillis = pb.third
set(Calendar.SECOND, 0)
set(Calendar.MILLISECOND, 0)
}.timeInMillis.div(1000)
val i = t.indexOfFirst { it == seconds_nearest_minute }
if (t.getOrNull(i) != null) {
if (pb.second > h[i])
h[i] = pb.second.toFloat()
else if (pb.second < l[i])
l[i] = pb.second.toFloat()
c[i] = pb.second.toFloat()
} else {
t.add(seconds_nearest_minute)
h.add(pb.second.toFloat())
o.add(pb.second.toFloat())
l.add(pb.second.toFloat())
c.add(pb.second.toFloat())
v.add(0)
}
}
}
@shakir915
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment