Skip to content

Instantly share code, notes, and snippets.

@senz
Created July 17, 2015 12:15
Show Gist options
  • Save senz/9b957471a257efc61afe to your computer and use it in GitHub Desktop.
Save senz/9b957471a257efc61afe to your computer and use it in GitHub Desktop.
Mixpanel API supporting extension and sending ip address
package com.mixpanel.mixpanelapi
import java.io._
import java.net.{URLEncoder, URLConnection, URL, InetAddress}
import scala.collection.JavaConverters._
import org.json.{JSONArray, JSONObject}
class ExtendedMixpanelAPI(protected final val eventsEndpoint: String, protected final val peopleEndpoint: String) {
/**
* Constructs a MixpanelAPI object associated with the production, Mixpanel services.
*/
def this() {
this(Config.BASE_ENDPOINT + "/track", Config.BASE_ENDPOINT + "/engage")
}
/**
* Sends a single message to Mixpanel servers.
*
* Each call to sendMessage results in a blocking call to remote Mixpanel servers.
* To send multiple messages at once, see #{@link #deliver(ClientDelivery)}
*
* @param message A JSONObject formatted by #{ @link MessageBuilder}
* @throws MixpanelMessageException if the given JSONObject is not (apparently) a Mixpanel message. This is a RuntimeException, callers should take care to submit only correctly formatted messages.
* @throws IOException if
*/
@throws(classOf[MixpanelMessageException])
@throws(classOf[IOException])
def sendMessage(message: JSONObject) {
val delivery: ClientDelivery = new ClientDelivery
delivery.addMessage(message)
deliver(delivery)
}
/**
* Attempts to send a given delivery to the Mixpanel servers. Will block,
* possibly on multiple server requests. For most applications, this method
* should be called in a separate thread or in a queue consumer.
*
* @param toSend a ClientDelivery containing a number of Mixpanel messages
* @param ipAddress an ip address to be used in event geo localization
* @throws IOException
* @see ClientDelivery
*/
@throws(classOf[IOException])
def deliver(toSend: ClientDelivery, ipAddress: IpAddressParam = IpAddressParam.IgnoreIp) {
import scala.collection.JavaConverters._
val ip = "ip=" + (ipAddress match {
case IpAddressParam.IgnoreIp => "0"
case IpAddressParam.UseSenderIp => "1"
case IpAddressParam.SpecifiedIp(ipAddr) => ipAddr.getHostAddress
})
val eventsUrl = eventsEndpoint + "?" + ip
val events = toSend.getEventsMessages.asScala.toList
sendMessages(events, eventsUrl)
val peopleUrl = peopleEndpoint + "?" + ip
val people = toSend.getPeopleMessages.asScala.toList
sendMessages(people, peopleUrl)
}
/**
* Package scope for mocking purposes
*/
@throws(classOf[IOException])
private[mixpanelapi] def sendData(dataString: String, endpointUrl: String): Boolean = {
val endpoint: URL = new URL(endpointUrl)
val conn: URLConnection = endpoint.openConnection
conn.setReadTimeout(ReadTimeoutMs)
conn.setDoOutput(true)
conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded;charset=utf8")
var utf8data: Array[Byte] = null
try {
utf8data = dataString.getBytes("utf-8")
}
catch {
case e: UnsupportedEncodingException => {
throw new RuntimeException("Mixpanel library requires utf-8 support", e)
}
}
val base64data: String = new String(Base64Coder.encode(utf8data))
val encodedData: String = URLEncoder.encode(base64data, "utf8")
val encodedQuery: String = "data=" + encodedData
var postStream: OutputStream = null
try {
postStream = conn.getOutputStream
postStream.write(encodedQuery.getBytes)
} finally {
if (postStream != null) {
try {
postStream.close()
}
catch {
case e: IOException =>
}
}
}
var responseStream: InputStream = null
var response: String = null
try {
responseStream = conn.getInputStream
response = slurp(responseStream)
} finally {
if (responseStream != null) {
try {
responseStream.close()
}
catch {
case e: IOException =>
}
}
}
(response != null) && (response == "1")
}
@throws(classOf[IOException])
protected def sendMessages(messages: List[JSONObject], endpointUrl: String) {
{
var i: Int = 0
while (i < messages.size) {
{
var endIndex: Int = i + Config.MAX_MESSAGE_SIZE
endIndex = Math.min(endIndex, messages.size)
val batch = messages.slice(i, endIndex)
if (batch.nonEmpty) {
val messagesString: String = dataString(batch)
if (!sendData(messagesString, endpointUrl)) {
throw new MixpanelServerException("Server refused to accept messages, they may be malformed.", batch.asJava)
}
}
}
i += Config.MAX_MESSAGE_SIZE
}
}
}
private def dataString(messages: List[JSONObject]): String = {
val array = new JSONArray
messages.foreach(array.put)
array.toString
}
@throws(classOf[IOException])
private def slurp(in: InputStream): String = {
val out = new StringBuilder
val reader = new InputStreamReader(in, "utf8")
val readBuffer = new Array[Char](BufferSize)
var readCount = 0
do {
readCount = reader.read(readBuffer)
if (readCount > 0) {
out.append(readBuffer, 0, readCount)
}
} while (readCount != -1)
out.toString()
}
protected val BufferSize: Int = 256
protected val ReadTimeoutMs: Int = 120000
}
sealed trait IpAddressParam
object IpAddressParam {
case object IgnoreIp extends IpAddressParam
case object UseSenderIp extends IpAddressParam
case class SpecifiedIp(ip: InetAddress) extends IpAddressParam
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment