Skip to content

Instantly share code, notes, and snippets.

@jay
Last active August 24, 2023 09:53
Show Gist options
  • Save jay/f355d98e87fde19b1455b0b31dd118fd to your computer and use it in GitHub Desktop.
Save jay/f355d98e87fde19b1455b0b31dd118fd to your computer and use it in GitHub Desktop.
Parse a stream received by libcurl into sections.
/* Parse a stream received by libcurl into sections.
Usage: ParseStream
This program is the same as libcurl example getinmemory.c except that it also
parses the received data into sections.
A data section refers to attributes and data and is received in this format:
[[<4 bytes: attribute size><attributes><8 bytes: data size><data>]...]
Here is an example of 4 sections {attribute, data} and those sections served as
a stream split in arbitrary sized chunks:
{ "foo", "bar" },
{ "", "" },
{ "baz", "" },
{ "", "qux" },
while true; do perl -e 'print
"HTTP/1.1 200 OK\r\n" .
"Transfer-Encoding: chunked\r\n" .
"\r\n" .
"2A\r\n" .
"\x03\x00\x00\x00\x66\x6f\x6f\x03\x00\x00\x00\x00\x00\x00\x00" .
"\x62\x61\x72\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" .
"\x03\x00\x00\x00\x62\x61\x7a\x00\x00\x00\x00\x00\r\n" .
"12\r\n" .
"\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00" .
"\x71\x75\x78\r\n" .
"0\r\n\r\n"
' | nc -4l localhost 8000; done
This program contains code from:
https://github.com/curl/curl/blob/curl-7_54_0/docs/examples/debug.c
https://github.com/curl/curl/blob/curl-7_54_0/docs/examples/getinmemory.c
https://github.com/curl/curl/blob/curl-7_54_0/lib/curl_endian.c
curl-library mailing list thread:
'streaming data with libcurl'
https://curl.haxx.se/mail/lib-2017-04/0129.html
Copyright (C) 2017 Jay Satiro <raysatiro@yahoo.com>
http://curl.haxx.se/docs/copyright.html
https://gist.github.com/jay/f355d98e87fde19b1455b0b31dd118fd
*/
/***************************************************************************
* _ _ ____ _
* Project ___| | | | _ \| |
* / __| | | | |_) | |
* | (__| |_| | _ <| |___
* \___|\___/|_| \_\_____|
*
* Copyright (C) 1998 - 2015, Daniel Stenberg, <daniel@haxx.se>, et al.
*
* This software is licensed as described in the file COPYING, which
* you should have received as part of this distribution. The terms
* are also available at https://curl.haxx.se/docs/copyright.html.
*
* You may opt to use, copy, modify, merge, publish, distribute and/or sell
* copies of the Software, and permit persons to whom the Software is
* furnished to do so, under the terms of the COPYING file.
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
* KIND, either express or implied.
*
***************************************************************************/
/* <DESC>
* Shows how the write callback function can be used to download data into a
* chunk of memory instead of storing it in a file.
* </DESC>
*/
#include <assert.h>
#include <limits.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <curl/curl.h>
#undef FALSE
#define FALSE 0
#undef TRUE
#define TRUE 1
struct MemoryStruct {
char *memory;
size_t size;
size_t allocated_size;
};
struct UserStruct {
CURL *curl_handle;
struct MemoryStruct *mem;
unsigned long long total_sections_parsed;
};
unsigned int read32_le(const unsigned char *buf)
{
return ((unsigned int)buf[0]) | ((unsigned int)buf[1] << 8) |
((unsigned int)buf[2] << 16) | ((unsigned int)buf[3] << 24);
}
unsigned long long read64_le(const unsigned char *buf)
{
return ((unsigned long long)buf[0]) |
((unsigned long long)buf[1] << 8) |
((unsigned long long)buf[2] << 16) |
((unsigned long long)buf[3] << 24) |
((unsigned long long)buf[4] << 32) |
((unsigned long long)buf[5] << 40) |
((unsigned long long)buf[6] << 48) |
((unsigned long long)buf[7] << 56);
}
void dump(const char *text, FILE *stream,
const unsigned char *ptr, unsigned long long size)
{
unsigned long long i;
unsigned int c, width = 0x10;
fprintf(stream, "%s length is %llu bytes (0x%llx)\n",
text, size, size);
for(i = 0; i < size; i += width) {
fprintf(stream, "%8.8llx: ", i);
/* show hex to the left */
for(c = 0; c < width; c++) {
if(i+c < size)
fprintf(stream, "%02x ", ptr[i+c]);
else
fputs(" ", stream);
}
/* show data on the right */
for(c = 0; (c < width) && (i+c < size); c++) {
char x = (ptr[i+c] >= 0x20 && ptr[i+c] < 0x80) ? ptr[i+c] : '.';
fputc(x, stream);
}
fputc('\n', stream); /* newline */
}
}
int Notify(char *attrib, unsigned int attrib_size,
char *data, unsigned long long data_size)
{
FILE *out = stdout;
const char *border = "******************************************************"
"********************";
fprintf(out, "%s\n", border);
fprintf(out, "Notify: A data section has been received.\n\n");
dump("Attributes", out, (unsigned char *)attrib, attrib_size);
fprintf(out, "\n");
dump("Data", out, (unsigned char *)data, data_size);
fprintf(out, "%s\n\n", border);
fflush(out);
return TRUE;
}
/* Parse, notify and remove data sections from memory.
* Return the number of data sections successfully handled or -1 on error.
*/
#define MAX_ATTRIBUTES_SIZE (unsigned int)(1048576)
#define MAX_DATA_SIZE (unsigned long long)(100 * 1048576)
long long ParseStream(struct UserStruct *user)
{
struct MemoryStruct *mem;
char *p, *unparsed;
size_t p_size, unparsed_size;
long long count;
count = 0;
mem = user->mem;
p = unparsed = mem->memory;
p_size = unparsed_size = mem->size;
for(;;) {
char *attrib, *data;
unsigned int attrib_size;
unsigned long long data_size;
if(user->total_sections_parsed == LLONG_MAX) {
fprintf(stderr, "Error: Maximum number of sections already parsed.\n");
count = -1;
break;
}
/* parse attributes */
if(p_size < 4)
break;
attrib_size = read32_le((unsigned char *)p);
if(attrib_size > MAX_ATTRIBUTES_SIZE) {
fprintf(stderr, "Error: received attributes size > maximum. %u > %u\n",
attrib_size, MAX_ATTRIBUTES_SIZE);
count = -1;
break;
}
p += 4;
p_size -= 4;
if(p_size < attrib_size)
break;
attrib = p;
p += attrib_size;
p_size -= attrib_size;
/* parse data */
if(p_size < 8)
break;
data_size = read64_le((unsigned char *)p);
if(data_size > MAX_DATA_SIZE) {
fprintf(stderr, "Error: received data size > maximum. %llu > %llu\n",
data_size, MAX_DATA_SIZE);
count = -1;
break;
}
p += 8;
p_size -= 8;
if(p_size < data_size)
break;
data = p;
p += data_size;
p_size -= (size_t)data_size;
/* notify a data section was parsed */
if(!Notify(attrib, attrib_size, data, data_size)) {
fprintf(stderr, "Error: Notify failed.\n");
count = -1;
break;
}
unparsed = p;
unparsed_size = p_size;
++count;
++user->total_sections_parsed;
}
/* remove completed data sections from memory */
if(mem->memory != unparsed) {
memmove(mem->memory, unparsed, unparsed_size);
mem->size = unparsed_size;
}
return count;
}
static size_t
WriteMemoryCallback(void *contents, size_t size, size_t nmemb, void *userp)
{
size_t realsize = size * nmemb;
struct UserStruct *user = (struct UserStruct *)userp;
struct MemoryStruct *mem = user->mem;
long response_code = 0;
/* indicate failure by returning a value other than the size passed */
size_t failure = realsize ? 0 : 1;
curl_easy_getinfo(user->curl_handle, CURLINFO_RESPONSE_CODE, &response_code);
/* Don't attempt to parse server responses other than 200 OK */
if(response_code != 200)
return realsize;
if(mem->allocated_size < mem->size + realsize) {
size_t bufsize = mem->size + realsize;
char *buf = realloc(mem->memory, bufsize);
if(!buf) {
fprintf(stderr, "Error: Out of memory.\n");
return failure;
}
mem->memory = buf;
mem->allocated_size = bufsize;
}
memcpy(&(mem->memory[mem->size]), contents, realsize);
mem->size += realsize;
if(ParseStream(user) < 0)
return failure;
return realsize;
}
int main(int argc, char *argv[])
{
CURL *curl_handle;
CURLcode res;
struct MemoryStruct chunk;
struct UserStruct user;
double average_speed, bytes_downloaded, total_download_time;
(void)argc;
(void)argv;
chunk.allocated_size = 1; /* will be grown as needed by the realloc above */
chunk.memory = malloc(chunk.allocated_size);
if(!chunk.memory) {
fprintf(stderr, "Error: Out of memory.\n");
return EXIT_FAILURE;
}
chunk.size = 0; /* no data at this point */
curl_global_init(CURL_GLOBAL_ALL);
/* init the curl session */
curl_handle = curl_easy_init();
/* fill out our user struct, which is passed to the write callback */
user.curl_handle = curl_handle;
user.mem = &chunk;
user.total_sections_parsed = 0;
/* specify URL to get */
curl_easy_setopt(curl_handle, CURLOPT_URL, "http://www.example.com/");
/* send all data to this function */
curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, WriteMemoryCallback);
/* we pass our user struct to the callback function */
curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, &user);
/* some servers don't like requests that are made without a user-agent
field, so we provide one */
curl_easy_setopt(curl_handle, CURLOPT_USERAGENT, "libcurl-agent/1.0");
/* get it! */
res = curl_easy_perform(curl_handle);
/* check for errors */
if(res != CURLE_OK) {
fprintf(stderr, "curl_easy_perform() failed: %s\n",
curl_easy_strerror(res));
}
/* show download stats */
average_speed = 0, bytes_downloaded = 0, total_download_time = 0;
curl_easy_getinfo(curl_handle, CURLINFO_SPEED_DOWNLOAD, &average_speed);
curl_easy_getinfo(curl_handle, CURLINFO_SIZE_DOWNLOAD, &bytes_downloaded);
curl_easy_getinfo(curl_handle, CURLINFO_TOTAL_TIME, &total_download_time);
fprintf(stderr, "\nTransfer rate: %.0f KB/sec"
" (%.0f bytes in %.0f seconds)\n",
average_speed / 1024, bytes_downloaded, total_download_time);
fprintf(stderr, "\nParsed %llu data sections from stream.\n",
user.total_sections_parsed);
if(chunk.size)
fprintf(stderr, "\nWARNING: Found unparsed stream data.\n");
/* cleanup curl stuff */
curl_easy_cleanup(curl_handle);
free(chunk.memory);
/* we're done with libcurl, so clean it up */
curl_global_cleanup();
return (res == CURLE_OK ? EXIT_SUCCESS : EXIT_FAILURE);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment