Skip to content

Instantly share code, notes, and snippets.

@TrQ-Hoan
Last active April 25, 2023 17:54
Show Gist options
  • Save TrQ-Hoan/eabd31f512b004a7a9b72fed05c1a161 to your computer and use it in GitHub Desktop.
Save TrQ-Hoan/eabd31f512b004a7a9b72fed05c1a161 to your computer and use it in GitHub Desktop.
Public Bitbucket Download Directory (Python 3 & Go >= 1.19)
package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"path"
"strings"
)
var counter = 0
func pretty_print(message string) {
counter = (counter + 1) % 4
animator := []string{"\\", "|", "/", "-"}
notice := fmt.Sprintf(" %s %s\r", message, animator[counter])
fmt.Printf("\033[2K\033[1G%s", notice)
}
func openDirectory(apiPath string, username string, repoSlug string, myPath string) {
directoryURL := fmt.Sprintf("%s/%s/%s/%s", apiPath, username, repoSlug, myPath)
jsonDataURLHandle, err := http.Get(directoryURL)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
pretty_print("Loading")
if jsonDataURLHandle.StatusCode != 200 {
fmt.Printf(" URL %s not found\n", directoryURL)
os.Exit(1)
}
defer jsonDataURLHandle.Body.Close()
data, err := ioutil.ReadAll(jsonDataURLHandle.Body)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
jsonDirectory := map[string]interface{}{}
err = json.Unmarshal(data, &jsonDirectory)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
for _, valuei := range jsonDirectory["values"].([]interface{}) {
if valuei.(map[string]interface{})["type"] == "commit_directory" {
directory := path.Base(valuei.(map[string]interface{})["path"].(string))
openDirectory(apiPath, username, repoSlug, myPath+"/"+url.PathEscape(directory))
} else {
err := os.MkdirAll(path.Dir(valuei.(map[string]interface{})["escaped_path"].(string)), 0755)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
fmt.Printf(" Downloading %s\n", valuei.(map[string]interface{})["escaped_path"])
resp, err := http.Get(valuei.(map[string]interface{})["links"].(map[string]interface{})["self"].(map[string]interface{})["href"].(string))
if err != nil {
fmt.Println(err)
os.Exit(1)
}
defer resp.Body.Close()
out, err := os.Create(valuei.(map[string]interface{})["path"].(string))
if err != nil {
fmt.Println(err)
os.Exit(1)
}
defer out.Close()
_, err = io.Copy(out, resp.Body)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
}
}
}
func main() {
if len(os.Args) != 2 || bytes.Index([]byte(os.Args[1]), []byte("https://bitbucket.org/")) != 0 || bytes.Index([]byte(os.Args[1]), []byte("/src/")) == -1 {
fmt.Printf(" Usage: %s https://bitbucket.org/pypy/numpy/src/master/numpy/tests/\n", os.Args[0])
fmt.Println(" Find the url by going to the source tab of a repository and browse to the directory you want to download")
os.Exit(1)
}
apiPath := "https://api.bitbucket.org/2.0/repositories"
u, err := url.Parse(os.Args[1])
if err != nil {
fmt.Println(err)
os.Exit(1)
}
pathParts := u.Path[1:]
paths := strings.Split(pathParts, "/")
username := paths[0]
repoSlug := paths[1]
urlPath := strings.Join(paths[2:], "/")
openDirectory(apiPath, username, repoSlug, urlPath)
}
#!/usr/bin/env python
from os import path, makedirs
import sys
from platform import system as platform_system
from urllib.request import urlopen
from urllib.parse import urlparse, quote as urlquote
try : import json
except : import simplejson as json
def pretty_print(message):
global counter
counter = (counter + 1)%4
animator = ['\\', '|', '/', "-"]
notice = " %s %s\r" % (message, animator[counter])
sys.stdout.write(u'\033[2K\033[1G')
sys.stdout.write(notice)
sys.stdout.flush()
def open_directory(API_PATH, username, repo_slug, mypath):
directory_url = "%s/%s/%s/%s" % (API_PATH, username, repo_slug, mypath)
json_data_url_handle = urlopen(directory_url)
pretty_print("Loading")
if json_data_url_handle.code != 200:
print(" URL %s not found" % directory_url)
sys.exit(1)
json_directory = json.loads(json_data_url_handle.read())
for valuei in json_directory[u'values']:
if valuei[u'type'] == 'commit_directory':
directory = path.basename(valuei[u'path'])
open_directory(API_PATH, username, repo_slug, mypath + "/" + urlquote(directory))
else:
try:
makedirs(path.dirname(valuei[u'escaped_path']))
except OSError:
pass
print(" Downloading %s" % valuei[u'escaped_path'])
with urlopen(valuei[u'links'][u'self'][u'href']) as d, open(valuei[u'path'], "wb") as opfile:
data = d.read()
opfile.write(data)
if (len(sys.argv) != 2 or sys.argv[1].find("https://bitbucket.org/") != 0 or sys.argv[1].find("/src/") == -1):
print(" Usage: %s https://bitbucket.org/pypy/numpy/src/master/numpy/tests/" % sys.argv[0])
print(" Find the url by going to the source tab of a repository and browse to the directory you want to download")
sys.exit()
if platform_system().lower() == 'windows':
from ctypes import windll, c_int, byref
stdout_handle = windll.kernel32.GetStdHandle(c_int(-11))
mode = c_int(0)
windll.kernel32.GetConsoleMode(c_int(stdout_handle), byref(mode))
mode = c_int(mode.value | 4)
windll.kernel32.SetConsoleMode(c_int(stdout_handle), mode)
counter = 0
API_PATH = "https://api.bitbucket.org/2.0/repositories"
null, username, repo_slug, urlpath = urlparse(sys.argv[1]).path.split("/", 3)
open_directory(API_PATH, username, repo_slug, urlpath)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment