Created
August 9, 2024 15:33
-
-
Save fancellu/eb56c1e66e9f4066f8bffcac188ea388 to your computer and use it in GitHub Desktop.
FS2 moving average demo
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Fahrenheit temperatures | |
should not parse | |
75 | |
60 | |
55 | |
54 | |
40 | |
60 | |
55 | |
66 | |
49 | |
54 | |
51 | |
59 | |
38 | |
50 | |
54 | |
63 | |
54 | |
49 | |
66 | |
61 | |
67 | |
55 | |
61 | |
59 | |
60 | |
65 | |
59 | |
41 | |
69 | |
62 | |
64 | |
53 | |
66 | |
54 | |
80 | |
64 | |
71 | |
53 | |
58 | |
42 | |
51 | |
53 | |
57 | |
43 | |
65 | |
66 | |
51 | |
67 | |
43 | |
56 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.effect.{IO, IOApp} | |
import fs2.{Stream, text} | |
import fs2.io.file.{Files, Path} | |
import fs2.io.{stdout, stderr} | |
import scala.util.Try | |
object WindowedAverage extends IOApp.Simple: | |
private object Fahrenheit: | |
// Using extractor to handle empty lines, comment lines and non valid strings | |
def unapply(line: String): Option[Double] = | |
if line.trim.nonEmpty && !line.startsWith("//") then | |
Try(line.toDouble).toOption | |
else None | |
private val averager = | |
// Stream is terminated after the first error, hence the need to emit to stderr | |
def handleError(error: Throwable): Stream[IO, Unit] = | |
Stream | |
.emit(s"\nError: ${error.getMessage}") | |
.through(text.utf8.encode) | |
.through(stderr[IO]()) | |
val WINDOW = 5; | |
val inout = Files[IO] | |
.readUtf8Lines(Path("testdata/fahrenheit.txt")) | |
.handleErrorWith(handleError) // Handle potential errors here | |
.collect { case Fahrenheit(double) => | |
double | |
} | |
.scan(Vector.empty[Double]) { case (list, dub) => | |
(if list.size >= WINDOW then list.drop(1) else list) :+ dub | |
} | |
.filter(_.length >= WINDOW) | |
.map(li => li.sum / li.length) | |
.map(_.toString) | |
// I prepend a header line before the averaged values | |
(Stream.emit("// running average") ++ inout) | |
.intersperse("\n") | |
.through(text.utf8.encode) | |
.through(stdout[IO]()) | |
def run: IO[Unit] = | |
averager.compile.drain |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Output:
// running average
56.8
53.8
52.8
55.0
54.0
56.8
55.0
55.8
50.2
50.4
50.4
52.8
51.8
54.0
57.2
58.6
59.4
59.6
62.0
60.6
60.4
60.0
60.8
56.8
58.8
59.2
59.0
57.8
62.8
59.8
63.4
63.4
67.0
64.4
65.2
57.6
55.0
51.4
52.2
49.2
53.8
56.8
56.4
58.4
58.4
56.6