Skip to content

Instantly share code, notes, and snippets.

@etorreborre
Created March 7, 2014 06:22
Show Gist options
  • Save etorreborre/9406287 to your computer and use it in GitHub Desktop.
Save etorreborre/9406287 to your computer and use it in GitHub Desktop.
Computing the MD5 hash of a file and writing it using scalaz-stream
import scalaz.stream.{Process1, Process, io, process1}
import Process._
import java.security._
import scalaz._
import scala.collection.mutable
import java.math.BigInteger
val P = Process
// store the final digest in a buffer
val digestBuffer = new ValueBuffer[MessageDigest]
val p1 =
Process.range(0, 10)
.map(_.toString)
.pipe(process1.utf8Encode)
// use a writer to compute the current MD5 digest
.pipe(logMd5)
// "drain" the Write values to a buffer just keeping the last one
.drainW(io.fillBuffer(digestBuffer))
.to(io.fileChunkW("target/test"))
// run the process to write the file
p1.runLast.run
// print the MD5
println(new BigInteger(1, digestBuffer.toList.head.digest).toString(16))
// Process1 updating a MD5 digest when receiving Array[Byte]
def logMd5: Process1[Array[Byte], MessageDigest \/ Array[Byte]] = {
def go(digest: MessageDigest): Process1[Array[Byte], MessageDigest \/ Array[Byte]] = {
receive1 {
case arr =>
digest.update(arr)
(P.emitO(arr) ++ P.emitW(digest)) fby go(digest)
}
}
go(MessageDigest.getInstance("MD5"))
}
// One value max buffer
class ValueBuffer[A] extends mutable.Buffer[A] {
private var element: Option[A] = None
def +=(elem: A) = { element = Some(elem); this }
def +=:(elem: A)= { element = Some(elem); this }
def apply(n: Int): A = ???
def clear(): Unit = element = None
def insertAll(n: Int,elems: Traversable[A]): Unit = ???
def length: Int = element.toList.size
def remove(n: Int): A = ???
def update(n: Int,newelem: A): Unit = ???
def iterator: Iterator[A] = element.toList.iterator
}
}
@fthomas
Copy link

fthomas commented Mar 7, 2014

Hi Eric! Can you accomplish the same with a simpler md5 and Process.logged and pipeO (or other Writer combinators)? I imagine that a simpler md5 could look like this:

def md5: Process1[Array[Byte],Array[Byte]] = {
  def go(digest: MessageDigest): Process1[Array[Byte],Array[Byte]] =
    await1[Array[Byte]].flatMap { bytes =>
      digest.update(bytes)
      go(digest)
    } orElse emit(digest.digest())
  go(MessageDigest.getInstance("MD5"))
}

@fthomas
Copy link

fthomas commented Mar 7, 2014

I tried to use md5 with pipeO but was stopped by https://github.com/scalaz/scalaz-stream/issues/122

@pchiusano
Copy link

Might want to use emitLazy in your version. But yeah, using that with pipeO seems like a nice way to do it, assuming pipe works properly.

@etorreborre
Copy link
Author

You're right @fthomas, it's much better to have md5 as a Process1. Can you paste here what you were trying to write with pipeO in order to get both the md5 and write the output to file (admitting that the bug you found is fixed)?

@fthomas
Copy link

fthomas commented Mar 8, 2014

Here is what I had in mind:

val p1 = Process.range(0, 10)
  .map(_.toString)
  .pipe(process1.utf8Encode)

val digest = Process.logged(p1)
  .pipeO(md5)
  .drainW(io.fileChunkW("target/test"))
  .runLast.run

@fthomas
Copy link

fthomas commented Mar 8, 2014

Oh my... this is embarrassing. The problem with pipeO can be obviated if we drainW to the sink and then pipe to md5:

val digest = Process.logged(p1)
  .drainW(io.fileChunkW("target/test"))
  .pipe(md5)
  .runLast.run

Eric, does this work for you?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment