Skip to content

Instantly share code, notes, and snippets.

@dstd
Last active February 19, 2020 17:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dstd/cc5710c52c58eec6f14ee1eb229eb4de to your computer and use it in GitHub Desktop.
Save dstd/cc5710c52c58eec6f14ee1eb229eb4de to your computer and use it in GitHub Desktop.
[Fuel library] patch to support payload streaming in uploading/downloading
diff --git a/kittinunf/fuel/core/requests/DownloadTaskRequest.kt b/kittinunf/fuel/core/requests/DownloadTaskRequest.kt
index d64d331..adfab9b 100755
--- a/kittinunf/fuel/core/requests/DownloadTaskRequest.kt
+++ b/kittinunf/fuel/core/requests/DownloadTaskRequest.kt
@@ -11,8 +11,8 @@ import java.net.URL
class DownloadTaskRequest(request: Request) : TaskRequest(request) {
- val BUFFER_SIZE = 1024
-
- var progressCallback: ((Long, Long) -> Unit)? = null
- lateinit var destinationCallback: ((Response, URL) -> File)
+ var destinationFileCallback: ((Response, URL) -> File)? = null
+ var destinationStreamCallback: ((Response, URL) -> OutputStream)? = null
- lateinit var dataStream: InputStream
- lateinit var fileOutputStream: FileOutputStream
@@ -20,12 +20,12 @@ class DownloadTaskRequest(request: Request) : TaskRequest(request) {
override fun call(): Response {
+ var outputStream: OutputStream? = null
try {
+ val destinationStreamCallback = destinationStreamCallback
+ val destinationFileCallback = destinationFileCallback
+
+ if (destinationFileCallback != null) {
+ val file = destinationFileCallback(request, request.url)
+ outputStream = FileOutputStream(file)
+ }
+ else if (destinationStreamCallback != null) {
+ outputStream = destinationStreamCallback(request, request.url)
+ }
+
+ request.responseOutput = outputStream
+
val response = Manager.instance.client.executeRequest(request)
- val file = destinationCallback.invoke(response, request.url)
- //file output
- fileOutputStream = FileOutputStream(file)
- dataStream = ByteArrayInputStream(response.data)
- dataStream.copyTo(fileOutputStream, BUFFER_SIZE) { readBytes ->
- progressCallback?.invoke(readBytes, response.httpContentLength)
- }
return response.apply { dispatchCallback(this) }
} catch(error: FuelError) {
if (error.exception as? InterruptedIOException != null) {
interruptCallback?.invoke(request)
}
throw error
+ } finally {
+ outputStream?.close()
}
diff --git a/kittinunf/fuel/core/Encoding.kt b/kittinunf/fuel/core/Encoding.kt
index 9b2fba4..91eda66 100755
--- a/kittinunf/fuel/core/Encoding.kt
+++ b/kittinunf/fuel/core/Encoding.kt
@@ -5,6 +5,7 @@ import com.github.kittinunf.fuel.util.toHexString
import java.net.MalformedURLException
import java.net.URI
import java.net.URL
+import java.util.*
import kotlin.properties.Delegates
class Encoding : Fuel.RequestConvertible {
@@ -15,9 +16,10 @@ class Encoding : Fuel.RequestConvertible {
var parameters: List<Pair<String, Any?>>? = null
var encoder: (Method, String, List<Pair<String, Any?>>?) -> Request = { method, path, parameters ->
+ val req = Request()
var modifiedPath = path
- var data: ByteArray? = null
- var headerPairs: MutableMap<String, Any> = hashMapOf("Accept-Encoding" to "compress;q=0.5, gzip;q=1.0")
+ val data: ByteArray?
+ val headerPairs = hashMapOf("Accept-Encoding" to "compress;q=0.5, gzip;q=1.0")
if (encodeParameterInUrl(method)) {
var querySign = ""
val queryParamString = queryFromParameters(parameters)
@@ -27,14 +29,18 @@ class Encoding : Fuel.RequestConvertible {
}
}
modifiedPath += (querySign + queryParamString)
+ data = null
} else if (requestType.equals(Request.Type.UPLOAD)) {
- val boundary = System.currentTimeMillis().toHexString()
- headerPairs.plusAssign("Content-Type" to "multipart/form-data; boundary=" + boundary)
+ if (req.boundary.isEmpty())
+ req.boundary = "!#" + UUID.randomUUID().toString().replace("-", "") + "#!"
+ headerPairs.plusAssign("Content-Type" to "multipart/form-data; boundary=" + req.boundary)
+ req.parameters = parameters
+ data = null
} else {
- headerPairs.plusAssign("Content-Type" to "application/x-www-form-urlencoded")
+ headerPairs.plusAssign("Content-Type" to "application/x-www-form-urlencoded; charset=utf-8")
data = queryFromParameters(parameters).toByteArray()
}
- Request().apply {
+ req.apply {
httpMethod = method
this.path = modifiedPath
this.url = createUrl(modifiedPath)
diff --git a/kittinunf/fuel/toolbox/HttpClient.kt b/kittinunf/fuel/toolbox/HttpClient.kt
index 0d1c9ed..4acca64 100755
--- a/kittinunf/fuel/toolbox/HttpClient.kt
+++ b/kittinunf/fuel/toolbox/HttpClient.kt
@@ -1,8 +1,12 @@
package com.github.kittinunf.fuel.toolbox
import com.github.kittinunf.fuel.core.*
+import com.github.kittinunf.fuel.util.copyFrom
+import com.github.kittinunf.fuel.util.copyTo
import java.io.BufferedOutputStream
import java.io.IOException
import java.net.HttpURLConnection
import java.net.URLConnection
import java.util.zip.GZIPInputStream
@@ -10,9 +14,9 @@ import javax.net.ssl.HttpsURLConnection
class HttpClient : Client {
override fun executeRequest(request: Request): Response {
- val response = Response()
- response.url = request.url
+ val response = Response(request.url)
+ val connectionInspector = ConnectionInspector.createInstance()
val connection = establishConnection(request) as HttpURLConnection
try {
@@ -23,21 +27,23 @@ class HttpClient : Client {
doInput = true
useCaches = false
requestMethod = request.httpMethod.value
+ connectionInspector.preConnect(connection, request.httpBody)
setDoOutput(connection, request.httpMethod)
for ((key, value) in request.httpHeaders) {
setRequestProperty(key, value)
}
- setBodyIfAny(connection, request.httpBody)
+ setBodyIfAny(connection, request)
}
return response.apply {
+ connectionInspector.postConnect()
httpResponseHeaders = connection.headerFields ?: emptyMap()
httpContentLength = connection.contentLength.toLong()
val contentEncoding = connection.contentEncoding ?: ""
- val dataStream = if (connection.errorStream != null) {
+ val dataStream1 = if (connection.errorStream != null) {
connection.errorStream
} else {
try {
@@ -47,12 +53,17 @@ class HttpClient : Client {
}
}
+ val dataStream = connectionInspector.interpretResponseStream(dataStream1)
+
if (dataStream != null) {
- data = if (contentEncoding.compareTo("gzip", true) == 0) {
- GZIPInputStream(dataStream).readBytes()
- } else {
- dataStream.readBytes()
- }
+ if (request.responseOutput != null)
+ request.responseOutput?.copyFrom(dataStream, bytesTotal = httpContentLength, progress = request.progressCallback)
+ else
+ data = if (contentEncoding.compareTo("gzip", true) == 0) {
+ GZIPInputStream(dataStream).readBytes()
+ } else {
+ dataStream.readBytes()
+ }
}
//try - catch just in case both methods throw
@@ -64,6 +75,8 @@ class HttpClient : Client {
}
}
} catch(exception: Exception) {
+ if (exception is IOException)
+ connectionInspector.httpExchangeFailed(exception)
throw FuelError().apply {
this.exception = exception
this.errorData = response.data
@@ -74,9 +87,10 @@ class HttpClient : Client {
}
private fun establishConnection(request: Request): URLConnection {
- return if (request.url.protocol.equals("https")) {
+ return if (request.url.protocol == "https") {
val conn = request.url.openConnection() as HttpsURLConnection
conn.apply {
+ //!warn : отключает проверку ssl, одновременно ломая работу keep-alive
sslSocketFactory = request.socketFactory
hostnameVerifier = request.hostnameVerifier
}
@@ -85,11 +99,17 @@ class HttpClient : Client {
}
}
- private fun setBodyIfAny(connection: HttpURLConnection, bytes: ByteArray) {
- if (bytes.size != 0) {
- val outStream = BufferedOutputStream(connection.outputStream)
- outStream.write(bytes)
- outStream.close()
+ private fun setBodyIfAny(connection: HttpURLConnection, request: Request) {
+ val httpBody = request.httpBody
+ val httpBodySuffix = request.httpBodySuffix
+ if (httpBody.size == 0 && request.httpBodyStream == null && httpBodySuffix == null)
+ return
+
+ BufferedOutputStream(connection.outputStream).use {
+ it.write(httpBody)
+ request.httpBodyStream?.copyTo(it, progress = request.progressCallback)
+ if (httpBodySuffix != null)
+ it.write(httpBodySuffix)
}
}
@@ -99,7 +119,6 @@ class HttpClient : Client {
Method.POST, Method.PUT, Method.PATCH -> connection.doOutput = true
}
}
-
}
diff --git a/kittinunf/fuel/util/InputStreams.kt b/kittinunf/fuel/util/InputStreams.kt
index 70d5f3e..3cd2fe5 100755
--- a/kittinunf/fuel/util/InputStreams.kt
+++ b/kittinunf/fuel/util/InputStreams.kt
@@ -3,15 +3,28 @@ package com.github.kittinunf.fuel.util
import java.io.InputStream
import java.io.OutputStream
-fun InputStream.copyTo(out: OutputStream, bufferSize: Int = DEFAULT_BUFFER_SIZE, progress: ((Long) -> Unit)?): Long {
+fun InputStream.copyTo(out: OutputStream, bytesTotal: Long = this.available().toLong(), bufferSize: Int = DEFAULT_BUFFER_SIZE, progress: ((Long, Long) -> Unit)?): Long {
var bytesCopied = 0L
val buffer = ByteArray(bufferSize)
var bytes = read(buffer)
while (bytes >= 0) {
out.write(buffer, 0, bytes)
bytesCopied += bytes
- progress?.invoke(bytesCopied)
+ progress?.invoke(bytesCopied, bytesTotal)
bytes = read(buffer)
}
return bytesCopied
}
+
+fun OutputStream.copyFrom(ins: InputStream, bytesTotal: Long = ins.available().toLong(), bufferSize: Int = DEFAULT_BUFFER_SIZE, progress: ((Long, Long) -> Unit)?): Long {
+ var bytesCopied = 0L
+ val buffer = ByteArray(bufferSize)
+ var bytes = ins.read(buffer)
+ while (bytes >= 0) {
+ write(buffer, 0, bytes)
+ bytesCopied += bytes
+ progress?.invoke(bytesCopied, bytesTotal)
+ bytes = ins.read(buffer)
+ }
+ return bytesCopied
+}
diff --git a/kittinunf/fuel/core/Request.kt b/kittinunf/fuel/core/Request.kt
index 9893440..5bbbd10 100755
--- a/kittinunf/fuel/core/Request.kt
+++ b/kittinunf/fuel/core/Request.kt
@@ -1,5 +1,6 @@
package com.github.kittinunf.fuel.core
+import com.github.kittinunf.fuel.android.utils.toReadableString
import com.github.kittinunf.fuel.core.deserializers.ByteArrayDeserializer
import com.github.kittinunf.fuel.core.deserializers.StringDeserializer
import com.github.kittinunf.fuel.core.requests.DownloadTaskRequest
@@ -8,6 +9,8 @@ import com.github.kittinunf.fuel.core.requests.UploadTaskRequest
import com.github.kittinunf.fuel.util.Base64
import com.github.kittinunf.result.Result
import java.io.File
+import java.io.InputStream
+import java.io.OutputStream
import java.net.URL
import java.nio.charset.Charset
import java.util.concurrent.Callable
@@ -28,12 +31,20 @@ class Request {
var syncMode = false
var type: Type = Type.REQUEST
+ var useFormForUpload: Boolean = true
lateinit var httpMethod: Method
lateinit var path: String
lateinit var url: URL
var httpBody: ByteArray = ByteArray(0)
+ var httpBodyStream: InputStream? = null
+ var httpBodySuffix: ByteArray? = null
+ var responseOutput: OutputStream? = null
+ var parameters: List<Pair<String, Any?>>? = null
+
+ var progressCallback: ((Long, Long) -> Unit)? = null
var httpHeaders = hashMapOf<String, String>()
+ var boundary: String = ""
//underlying task request
val taskRequest: TaskRequest by lazy {
@@ -109,41 +120,57 @@ class Request {
}
fun progress(handler: (readBytes: Long, totalBytes: Long) -> Unit): Request {
- if (taskRequest as? DownloadTaskRequest != null) {
- val download = taskRequest as DownloadTaskRequest
- download.apply {
- progressCallback = handler
- }
- } else if (taskRequest as? UploadTaskRequest != null) {
- val upload = taskRequest as UploadTaskRequest
- upload.apply {
- progressCallback = handler
- }
- } else {
- throw IllegalStateException("progress is only used with RequestType.DOWNLOAD or RequestType.UPLOAD")
+ when (taskRequest) {
+ is DownloadTaskRequest,
+ is UploadTaskRequest -> progressCallback = handler
+ else -> throw IllegalStateException("progress is only used with RequestType.DOWNLOAD or RequestType.UPLOAD")
}
return this
}
- fun source(source: (Request, URL) -> File): Request {
+ class SourceStream(val filename: String, val stream: InputStream)
+ fun sourceStream(source: (Request, URL) -> SourceStream): Request {
+ val uploadTaskRequest = taskRequest as? UploadTaskRequest ?: throw IllegalStateException("source is only used with RequestType.UPLOAD")
+
+ uploadTaskRequest.apply {
+ sourceStreamCallback = source
+ }
+ return this
+ }
+
+ fun sourceFile(source: (Request, URL) -> File): Request {
val uploadTaskRequest = taskRequest as? UploadTaskRequest ?: throw IllegalStateException("source is only used with RequestType.UPLOAD")
uploadTaskRequest.apply {
- sourceCallback = source
+ sourceFileCallback = source
+ }
+ return this
+ }
+
+ fun destinationStream(destination: (Request, URL) -> OutputStream): Request {
+ val downloadTaskRequest = taskRequest as? DownloadTaskRequest ?: throw IllegalStateException("destination is only used with RequestType.DOWNLOAD")
+
+ downloadTaskRequest.apply {
+ destinationStreamCallback = destination
}
return this
}
- fun destination(destination: (Response, URL) -> File): Request {
+ fun destinationFile(destination: (Request, URL) -> File): Request {
val downloadTaskRequest = taskRequest as? DownloadTaskRequest ?: throw IllegalStateException("destination is only used with RequestType.DOWNLOAD")
downloadTaskRequest.apply {
- destinationCallback = destination
+ destinationFileCallback = destination
}
return this
}
+ fun useFormForUpload(use: Boolean): Request {
+ useFormForUpload = use
+ return this
+ }
+
fun interrupt(interrupt: (Request) -> Unit): Request {
taskRequest.apply {
interruptCallback = interrupt
@@ -194,15 +221,17 @@ class Request {
override fun toString(): String {
val elements = arrayListOf("--> $httpMethod (${url.toString()})")
- //body
- elements.add("Body : ${ if (httpBody.size != 0) String(httpBody) else "(empty)"}")
-
//headers
elements.add("Headers : (${httpHeaders.size})")
for ((key, value) in httpHeaders) {
elements.add("$key : $value")
}
+ //body
+ elements.add(" ")
+ elements.add("Body : ${ if (httpBody.size != 0) httpBody.size.toString() else "(empty)"}")
+ elements.add(httpBody.copyOfRange(0, Math.min(1024, httpBody.size)).toReadableString())
+
return elements.joinToString("\n").toString()
}
diff --git a/kittinunf/fuel/core/Response.kt b/kittinunf/fuel/core/Response.kt
index 2b3dd28..6a6c483 100755
--- a/kittinunf/fuel/core/Response.kt
+++ b/kittinunf/fuel/core/Response.kt
@@ -1,10 +1,9 @@
package com.github.kittinunf.fuel.core
+import com.github.kittinunf.fuel.android.utils.toReadableString
import java.net.URL
-class Response {
-
- lateinit var url: URL
+class Response(val url: URL) {
var httpStatusCode = -1
var httpResponseMessage = ""
@@ -23,16 +22,17 @@ class Response {
//content length
elements.add("Length : $httpContentLength")
- //body
- elements.add("Body : ${ if (data.size != 0) String(data) else "(empty)"}")
-
- //headers
//headers
elements.add("Headers : (${httpResponseHeaders.size})")
for ((key, value) in httpResponseHeaders) {
elements.add("$key : $value")
}
+ //body
+ elements.add(" ")
+ elements.add("Body : ${ if (data.size != 0) data.size.toString() else "(empty)"}")
+ elements.add(data.copyOfRange(0, Math.min(2048, data.size)).toReadableString())
+
return elements.joinToString("\n").toString()
}
diff --git a/kittinunf/fuel/core/requests/UploadTaskRequest.kt b/kittinunf/fuel/core/requests/UploadTaskRequest.kt
index a2a35a8..d5f8e9e 100755
--- a/kittinunf/fuel/core/requests/UploadTaskRequest.kt
+++ b/kittinunf/fuel/core/requests/UploadTaskRequest.kt
@@ -6,10 +6,7 @@ import com.github.kittinunf.fuel.core.Request
import com.github.kittinunf.fuel.core.Response
import com.github.kittinunf.fuel.util.copyTo
import com.github.kittinunf.fuel.util.toHexString
-import java.io.ByteArrayOutputStream
-import java.io.File
-import java.io.FileInputStream
-import java.io.InterruptedIOException
+import java.io.*
import java.net.URL
import java.net.URLConnection
@@ -17,42 +14,83 @@ class UploadTaskRequest(request: Request) : TaskRequest(request) {
val BUFFER_SIZE = 1024
- val CRLF = "\\r\\n"
- val boundary = System.currentTimeMillis().toHexString()
+ val CRLF = "\r\n"
var progressCallback: ((Long, Long) -> Unit)? = null
- lateinit var sourceCallback: ((Request, URL) -> File)
+ var sourceFileCallback: ((Request, URL) -> File)? = null
+ var sourceStreamCallback: ((Request, URL) -> Request.SourceStream)? = null
- var dataStream: ByteArrayOutputStream? = null
- var fileInputStream: FileInputStream? = null
+ private fun StringBuilder.writeParam(boundary: String, name: String, value: String) {
+ append(boundary).append(CRLF)
+ append("Content-Disposition: form-data; name=\"").append(name).append("\"").append(CRLF)
+ append(CRLF)
+ append(value).append(CRLF)
+ }
+
+ private fun StringBuilder.writeArray(boundary: String, namePrefix: String, array: Array<*>) {
+ val name = "$namePrefix[]"
+ for (value in array) {
+ when (value) {
+ is String -> writeParam(boundary, name, value)
+ else -> writeParam(boundary, name, value.toString())
+ }
+ }
+ }
override fun call(): Response {
+ val sourceFileCallback = sourceFileCallback
+ val sourceStreamCallback = sourceStreamCallback
+
+ var fileInputStream: InputStream? = null
+ val filename: String
+
try {
- val file = sourceCallback.invoke(request, request.url)
- //file input
- fileInputStream = FileInputStream(file)
- dataStream = ByteArrayOutputStream().apply {
- write(("--" + boundary + CRLF).toByteArray())
- write(("Content-Disposition: form-data; filename=\"" + file.name + "\"").toByteArray())
- write(CRLF.toByteArray())
- write(("Content-Type: " + URLConnection.guessContentTypeFromName(file.name)).toByteArray())
- write(CRLF.toByteArray())
- write(CRLF.toByteArray())
- flush()
-
- //input file data
- fileInputStream!!.copyTo(this, BUFFER_SIZE) { writtenBytes ->
- progressCallback?.invoke(writtenBytes, file.length())
+ if (sourceFileCallback != null) {
+ val file = sourceFileCallback.invoke(request, request.url)
+ filename = file.name
+ fileInputStream = FileInputStream(file)
+ } else if (sourceStreamCallback != null) {
+ val source = sourceStreamCallback.invoke(request, request.url)
+ filename = source.filename
+ fileInputStream = source.stream
+ } else
+ throw IllegalStateException("no source specified for RequestType.UPLOAD")
+
+ if (request.useFormForUpload) {
+ val boundary = "--${request.boundary}"
+ val payloadPrefix = StringBuilder()
+
+ request.parameters?.let {
+ for ((name, value) in it) {
+ when (value) {
+ is String -> payloadPrefix.writeParam(boundary, name, value)
+ is Array<*> -> payloadPrefix.writeArray(boundary, name, value)
+ }
+ }
}
- write(CRLF.toByteArray())
- flush()
- write(("--$boundary--").toByteArray())
- write(CRLF.toByteArray())
- flush()
- }
+ payloadPrefix
+ .append(boundary).append(CRLF)
+ .append("Content-Disposition: form-data; name=\"userfile\"; filename=\"").append(filename).append("\"").append(CRLF)
+ .append("Content-Type: ").append(URLConnection.guessContentTypeFromName(filename)).append(CRLF)
+ .append(CRLF)
- request.body(dataStream!!.toByteArray())
+
+ val payloadSuffix =
+ CRLF +
+ "--${request.boundary}--" + CRLF
+
+ request.httpBody = payloadPrefix.toString().toByteArray()
+ request.httpBodySuffix = payloadSuffix.toByteArray()
+ request.httpBodyStream = fileInputStream
+ } else {
+ request.httpHeaders["Content-Disposition"] = "attachment; filename=\"$filename\"; filename*=UTF-8''$filename"
+ request.httpHeaders["Content-Type"] = "application/octet-stream"
+ if (request.httpBody.size != 0)
+ request.httpBody = ByteArray(0)
+ request.httpBodyStream = fileInputStream
+ request.httpBodySuffix = null
+ }
return Manager.instance.client.executeRequest(request).apply { dispatchCallback(this) }
} catch(error: FuelError) {
if (error.exception as? InterruptedIOException != null) {
@@ -60,7 +98,6 @@ class UploadTaskRequest(request: Request) : TaskRequest(request) {
}
throw error
} finally {
- dataStream?.close()
fileInputStream?.close()
}
}
@Dima-369
Copy link

Dima-369 commented May 20, 2017

Thanks a lot for this!
Fuel.download works flawlessly with the .progress callback :)


But I noticed how the .progress callback does not work when used together with Fuel.upload.
The bytes read do not correspond to the sent network bytes, but seem to be from reading the file from the filesystem, so the callback is always done in a few milliseconds when the file is finished loading.

Do you know how to fix this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment