Skip to content

Instantly share code, notes, and snippets.

@simon-mo
Created May 11, 2023 22:43
Show Gist options
  • Save simon-mo/cda85194f4bcfcfaaf0779851aef440f to your computer and use it in GitHub Desktop.
Save simon-mo/cda85194f4bcfcfaaf0779851aef440f to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"# s3://feature-store-datasets/wikipedia/diffs/\n",
"\n",
"# NOTE: this doesn't deal with nested prefix right now (i.e. no / in the name). It assume a flat layout.\n",
"# You can use https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#AmazonS3-ListObjectsV2-response-CommonPrefixes to find\n",
"# the next / delimiter."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"import boto3\n",
"import string\n",
"\n",
"# can we use regex that enumerate. If we do alphanumeric the search space blows up quickly\n",
"# another approach is preioritize the pattern we have seem before using some sort of episilon greedy style exploration\n",
"alphanumeric = string.digits\n",
"bucket_name = \"feature-store-datasets\"\n",
"search_prefix = \"wikipedia/diffs/\"\n",
"\n",
"s3 = boto3.client(\"s3\")\n",
"\n",
"\n",
"def probe_once(prefix):\n",
" resp = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix, MaxKeys=1)\n",
" return {\"prefix_searched\": prefix, \"has_objects\": resp[\"KeyCount\"] > 0}"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Searching 10 prefixes\n",
"Found 1 prefixes\n",
"Searching 10 prefixes\n",
"Found 1 prefixes\n",
"Searching 10 prefixes\n",
"Found 4 prefixes\n",
"Searching 40 prefixes\n",
"Found 14 prefixes\n",
"Searching 140 prefixes\n",
"Found 66 prefixes\n",
"Searching 660 prefixes\n",
"Found 497 prefixes\n",
"CPU times: user 5.62 s, sys: 3.91 s, total: 9.53 s\n",
"Wall time: 2.67 s\n"
]
}
],
"source": [
"%%time\n",
"\n",
"from concurrent.futures import ThreadPoolExecutor, as_completed\n",
"\n",
"search_round = 10 # we will search [0-9]{search_length} for hits\n",
"stop_at_num_found = 400 # stop searching when we found this many prefixes\n",
"\n",
"thread_pool = ThreadPoolExecutor(max_workers=100)\n",
"\n",
"to_search_tasks = [search_prefix + char for char in alphanumeric]\n",
"found_prefix = []\n",
"\n",
"for _ in range(search_round):\n",
" found_prefix.clear()\n",
"\n",
" # submit all the tasks\n",
" print(f\"Searching {len(to_search_tasks)} prefixes\")\n",
" tasks = [thread_pool.submit(probe_once, prefix) for prefix in to_search_tasks]\n",
"\n",
" # iterate over the task as they complete\n",
" for future in as_completed(tasks):\n",
" assert future.done()\n",
" if future.exception():\n",
" print(f\"Exception {future.exception()}\")\n",
" raise future.exception()\n",
" elif future.result() and future.result()[\"has_objects\"]:\n",
" found_prefix.append(future.result()[\"prefix_searched\"])\n",
" print(f\"Found {len(found_prefix)} prefixes\")\n",
" if len(found_prefix) > stop_at_num_found:\n",
" break\n",
"\n",
" # update the search space\n",
" to_search_tasks = [prefix + char for prefix in found_prefix for char in alphanumeric]"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Working on 497 prefixes\n",
"Number of objects found 0\n",
"Number of objects found 9476\n",
"Number of objects found 29973\n",
"Number of objects found 58030\n",
"CPU times: user 5.61 s, sys: 2.63 s, total: 8.23 s\n",
"Wall time: 4.8 s\n"
]
}
],
"source": [
"%%time\n",
"\n",
"from concurrent.futures import ThreadPoolExecutor, as_completed\n",
"from queue import Queue\n",
"import time\n",
"\n",
"# full listing!\n",
"print(f\"Working on {len(found_prefix)} prefixes\")\n",
"\n",
"thread_pool = ThreadPoolExecutor(max_workers=100)\n",
"queue = Queue()\n",
"total_obj_count = 0\n",
"\n",
"\n",
"def list_objects_full(prefix):\n",
" continuation_token = None\n",
"\n",
" while True:\n",
" resp = s3.list_objects_v2(\n",
" Bucket=bucket_name, Prefix=prefix, MaxKeys=1000, **({\"ContinuationToken\": continuation_token} if continuation_token else {})\n",
" )\n",
" # print(f\"Found {len(resp['Contents'])} objects for {prefix}\")\n",
"\n",
" # put the content in the queue\n",
" queue.put(resp[\"Contents\"])\n",
"\n",
" global total_obj_count\n",
" total_obj_count += len(resp[\"Contents\"])\n",
"\n",
" # check if we need to continue\n",
" if not resp[\"IsTruncated\"]:\n",
" break\n",
"\n",
" continuation_token = resp[\"NextContinuationToken\"]\n",
"\n",
"\n",
"tasks = [thread_pool.submit(list_objects_full, prefix) for prefix in found_prefix]\n",
"while not all(task.done() for task in tasks):\n",
" print(f\"Number of objects found {total_obj_count}\")\n",
" time.sleep(1)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"66102"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"full_result = []\n",
"while not queue.empty():\n",
" full_result.extend(queue.get())\n",
"len(full_result)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "base",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.13"
},
"orig_nbformat": 4
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment