Skip to content

Instantly share code, notes, and snippets.

@nampdn
Last active June 3, 2023 00:06
Show Gist options
  • Save nampdn/1b6c83132c2e3be849c6f6a91f5c61a0 to your computer and use it in GitHub Desktop.
Save nampdn/1b6c83132c2e3be849c6f6a91f5c61a0 to your computer and use it in GitHub Desktop.
Symato CC Distributed Computing
import trafilatura
from fastwarc.warc import ArchiveIterator
import fasttext
from tqdm import tqdm
import pypandoc
import bs4 as bs
import pandas as pd
import multiprocessing as mp
import argparse
import os
from huggingface_hub import HfApi, CommitOperationAdd
# disable fasttext warning
fasttext.FastText.eprint = lambda x: None
# load fasttext model: https://fasttext.cc/docs/en/language-identification.html
model = fasttext.load_model('./lid.176.bin')
def extract_text(content):
text = trafilatura.extract(content)
if text is None:
return None
lang = model.predict(text.replace('\n', '. '))[0][0].split('__')[-1]
if lang == 'vi':
soup = bs.BeautifulSoup(content, 'lxml')
if soup.find('body'):
soup = soup.find('body')
if soup.find('main'):
soup = soup.find('main')
list_Tags = ['header', 'footer', 'script', 'style']
for tag in list_Tags:
for div in soup.find_all(tag):
div.decompose()
html_string = soup.prettify()
markdown = pypandoc.convert_text(
html_string, to="gfm+hard_line_breaks-raw_html", format='html', extra_args=['--quiet'])
item = {'text': text, 'markdown': markdown, }
return item
else:
return None
def extract_warc(file):
tasks = []
for record in tqdm(ArchiveIterator(open(file, 'rb'), func_filter=lambda r: r.headers.get('WARC-Identified-Payload-Type') == 'text/html'),
desc=f'Loading {file}'):
content = record.reader.read()
tasks.append(content)
items = []
with mp.Pool(args.num_workers) as p:
for item in tqdm(p.imap(extract_text, tasks), total=len(tasks), desc='Extracting text'):
if item is not None:
items.append(item)
df = pd.DataFrame(items)
output_parquet = os.path.join('/outputs', os.path.basename(file).replace('.warc.gz', '.parquet'))
df.to_parquet(output_parquet)
print("File name: ", file)
print("Total pages: ", len(tasks))
print("Total Vietnamese pages: ", len(df))
print("Output: ", output_parquet)
print("====================================")
result = {
'file_path': output_parquet,
'total_page': len(tasks),
'vi_page': len(df)
}
return result
def to_huggingface(item, dump_name, token):
print('Uploading to huggingface hub...')
api = HfApi()
operations = []
description = ''
path_in_repo = '{}/{}'.format(dump_name,
os.path.basename(item['file_path']))
operations.append(
CommitOperationAdd(
path_in_repo=path_in_repo,
path_or_fileobj=item['file_path'],
)
)
description += "\n- {}: {} vi page out of {} pages".format(
path_in_repo, item['vi_page'], item['total_page'])
api.create_commit(
repo_id='Symato/CC-VI',
operations=operations,
commit_message='Add parquet files to dumps {}'.format(dump_name),
commit_description=description,
repo_type='dataset',
create_pr=True,
token=token
)
print('Done!')
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('--dump', type=str,
help='Dump name of the warc file belong to', required=True)
parser.add_argument('--token', type=str, help='Huggingface token')
parser.add_argument('--input_file', type=str,
help='HTTP Link Of WARC file', required=True)
n_workers = mp.cpu_count() - 1 if mp.cpu_count() > 1 else 1
parser.add_argument('--num_workers', type=int, default=n_workers)
return parser.parse_args()
if __name__ == '__main__':
args = parse_args()
input_local_file = os.path.join("/inputs", args.input_file)
output_parquet = extract_warc(input_local_file)
if len(args.token) > 0:
to_huggingface(output_parquet, args.dump, args.token)
#!/usr/bin/env bash
# Bacalhau authors (c)
# Original copyright
# https://raw.githubusercontent.com/SAME-Project/SAME-installer-website/main/install_script.sh
# ------------------------------------------------------------
# Copyright (c) Microsoft Corporation and Dapr Contributors.
# Licensed under the MIT License.
# ------------------------------------------------------------
# BACALHAU CLI location
: "${BACALHAU_INSTALL_DIR:="/usr/local/bin"}"
# sudo is required to copy binary to BACALHAU_INSTALL_DIR for linux
: "${USE_SUDO:="false"}"
# Option to download pre-releases
: "${PRE_RELEASE:="false"}"
# Http request CLI
BACALHAU_HTTP_REQUEST_CLI=curl
# GitHub Organization and repo name to download release
GITHUB_ORG=bacalhau-project
GITHUB_REPO=bacalhau
# BACALHAU CLI filename
BACALHAU_CLI_FILENAME=bacalhau
BACALHAU_CLI_FILE="${BACALHAU_INSTALL_DIR}/${BACALHAU_CLI_FILENAME}"
BACALHAU_PUBLIC_KEY=$(cat <<-END
-----BEGIN PUBLIC KEY-----
MIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEA7bXxrECk3tQfKox7MDaN
OAQ+NATnILQ9XFfYHs+4Q04lK1tHpvUEwm9OwidMJKlr+M1f/9rzLYV6RDrv0FuA
xaxGS6xrYwmLiXhDj4KwU3v5e6lHhzupsj+7LNSZ9g+ppCXcw73l5wtNmFojKQDH
vpKDzB2KKqRq7/TRenNwvMD02zuDcjGdgXSeSiyIZ6jCn9Y6pX7nPF4rOxpFSL/w
oSb5q5tVY3ZqyrNx/Bk9mBoX3a8xDqFtthuC6SjIF1t5arLih2yEpq8hOdGyyX1l
uQCYlYuIwsYZL+fj2fFzhqpmrHBB97Npw1bTjnzQ8HQIsxkrMEg9ePFfcRfWw7w6
nWBLD4JOTFOoi9SPB0BdyqvE8B+6FTlT8XbK7/VtheR4yFVHvrnVkGzIm6AnwINc
9yFlS5FbxHh0vzL5G4jTYVZrZ7YaQ/zxgZ/SHE9fcSZv4l+W2vlo1EivtOgy1Ee6
OfDFMvdHyg04qjOGxUzYDxZ4/AL+ywSm1HDXP93Oi8icKXy5OANogW4XZ5hll54g
4EBqSON/HH4eIvyWTfFG+U6DBtD0Qn4gZO9y1KUNbhDQ0Z6LOC/mKgWhPSKRdFJk
L9lmeqYFIvAnBx5rmyE7Hlzqk4pSRfggra0D2ydTV79tUQGlX5wpkwch/s4nRmZb
rZd9rvTsifOjf2jxGGu5N6ECAwEAAQ==
-----END PUBLIC KEY-----
END
)
installDocker() {
# Check if Docker is installed
if ! command -v docker &> /dev/null; then
echo "Docker is not installed. Installing Docker now..."
# Download the script and execute it
curl -fsSL https://get.docker.com -o get-docker.sh
sudo sh get-docker.sh
sudo usermod -aG docker $USER
# Remove the script
rm get-docker.sh
echo "Docker has been installed successfully"
else
echo "Docker is already installed"
fi
}
installAWSCLI() {
# Install Unzip
sudo apt-get install -y unzip
# Download AWS CLI package
curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
# Unzip the package
unzip awscliv2.zip
# Install AWS CLI
sudo ./aws/install --update -i /usr/local/aws-cli -b $BACALHAU_INSTALL_DIR
# Clean up downloaded zip file
rm awscliv2.zip
rm -rf ./aws
}
getSystemInfo() {
ARCH=$(uname -m)
case $ARCH in
armv7*) ARCH="arm" ;;
aarch64) ARCH="arm64" ;;
x86_64) ARCH="amd64" ;;
esac
OS=$(eval "echo $(uname)|tr '[:upper:]' '[:lower:]'")
# Most linux distro needs root permission to copy the file to /usr/local/bin
if [ "$OS" == "linux" ] && [ "$BACALHAU_INSTALL_DIR" == "/usr/local/bin" ]; then
USE_SUDO="true"
# Darwin needs permission to copy the file to /usr/local/bin
elif [ "$OS" == "darwin" ] && [ "$BACALHAU_INSTALL_DIR" == "/usr/local/bin" ]; then
USE_SUDO="true"
fi
}
verifySupported() {
local supported=(linux-amd64 linux-arm64 darwin-amd64 darwin-arm64)
local current_osarch="${OS}-${ARCH}"
for osarch in "${supported[@]}"; do
if [ "$osarch" == "$current_osarch" ]; then
echo "Your system is ${OS}_${ARCH}"
return
fi
done
echo "No prebuilt binary for ${current_osarch}"
exit 1
}
runAsRoot() {
local CMD="$*"
if [ $EUID -ne 0 ] && [ $USE_SUDO = "true" ]; then
CMD="sudo $CMD"
fi
$CMD
}
checkHttpRequestCLI() {
if type "curl" > /dev/null; then
BACALHAU_HTTP_REQUEST_CLI=curl
elif type "wget" > /dev/null; then
BACALHAU_HTTP_REQUEST_CLI=wget
else
echo "Either curl or wget is required"
exit 1
fi
}
checkExistingBacalhau() {
if [ -f "$BACALHAU_CLI_FILE" ]; then
echo -e "\nBACALHAU CLI is detected:"
$BACALHAU_CLI_FILE version
echo -e "Reinstalling BACALHAU CLI - ${BACALHAU_CLI_FILE}..."
else
echo -e "No BACALHAU detected. Installing fresh BACALHAU CLI..."
fi
}
getLatestRelease() {
# /latest ignores pre-releases, see https://docs.github.com/en/rest/releases/releases#get-the-latest-release
if [ "$PRE_RELEASE" == "true" ]; then
echo "Installing most recent pre-release version..."
local bacalhauReleaseUrl="https://api.github.com/repos/${GITHUB_ORG}/${GITHUB_REPO}/releases"
else
local bacalhauReleaseUrl="https://api.github.com/repos/${GITHUB_ORG}/${GITHUB_REPO}/releases/latest"
fi
local latest_release=""
if [ "$BACALHAU_HTTP_REQUEST_CLI" == "curl" ]; then
latest_release=$(curl -s $bacalhauReleaseUrl | grep \"tag_name\" | grep -E -i '"v[0-9]+\.[0-9]+\.[0-9]+"' | grep -v rc | awk 'NR==1{print $2}' | sed -n 's/\"\(.*\)\",/\1/p')
else
latest_release=$(wget -q --header="Accept: application/json" -O - $bacalhauReleaseUrl | grep \"tag_name\" | grep -E -i '^v[0-9]+\.[0-9]+\.[0-9]+$' | grep -v rc | awk 'NR==1{print $2}' | sed -n 's/\"\(.*\)\",/\1/p')
fi
ret_val=$latest_release
}
# --- create temporary directory and cleanup when done ---
setup_tmp() {
BACALHAU_TMP_ROOT=$(mktemp -d 2>/dev/null || mktemp -d -t 'bacalhau-install.XXXXXXXXXX')
cleanup() {
code=$?
set +e
trap - EXIT
rm -rf "${BACALHAU_TMP_ROOT}"
exit $code
}
trap cleanup INT EXIT
}
downloadFile() {
LATEST_RELEASE_TAG=$1
BACALHAU_CLI_ARTIFACT="${BACALHAU_CLI_FILENAME}_${LATEST_RELEASE_TAG}_${OS}_${ARCH}.tar.gz"
BACALHAU_SIG_ARTIFACT="${BACALHAU_CLI_ARTIFACT}.signature.sha256"
DOWNLOAD_BASE="https://github.com/${GITHUB_ORG}/${GITHUB_REPO}/releases/download"
CLI_DOWNLOAD_URL="${DOWNLOAD_BASE}/${LATEST_RELEASE_TAG}/${BACALHAU_CLI_ARTIFACT}"
SIG_DOWNLOAD_URL="${DOWNLOAD_BASE}/${LATEST_RELEASE_TAG}/${BACALHAU_SIG_ARTIFACT}"
CLI_TMP_FILE="$BACALHAU_TMP_ROOT/$BACALHAU_CLI_ARTIFACT"
SIG_TMP_FILE="$BACALHAU_TMP_ROOT/$BACALHAU_SIG_ARTIFACT"
echo "Downloading $CLI_DOWNLOAD_URL ..."
if [ "$BACALHAU_HTTP_REQUEST_CLI" == "curl" ]; then
curl -SsLN "$CLI_DOWNLOAD_URL" -o "$CLI_TMP_FILE"
else
wget -q -O "$CLI_TMP_FILE" "$CLI_DOWNLOAD_URL"
fi
if [ ! -f "$CLI_TMP_FILE" ]; then
echo "failed to download $CLI_DOWNLOAD_URL ..."
exit 1
fi
echo "Downloading sig file $SIG_DOWNLOAD_URL ..."
if [ "$BACALHAU_HTTP_REQUEST_CLI" == "curl" ]; then
curl -SsLN "$SIG_DOWNLOAD_URL" -o "$SIG_TMP_FILE"
else
wget -q -O "$SIG_TMP_FILE" "$SIG_DOWNLOAD_URL"
fi
if [ ! -f "$SIG_TMP_FILE" ]; then
echo "failed to download $SIG_DOWNLOAD_URL ..."
exit 1
fi
}
verifyTarBall() {
# echo "ROOT: $BACALHAU_TMP_ROOT"
# echo "Public Key: $BACALHAU_PUBLIC_KEY"
echo "$BACALHAU_PUBLIC_KEY" > "$BACALHAU_TMP_ROOT/BACALHAU_public_file.pem"
openssl base64 -d -in "$SIG_TMP_FILE" -out "$SIG_TMP_FILE".decoded
if openssl dgst -sha256 -verify "$BACALHAU_TMP_ROOT/BACALHAU_public_file.pem" -signature "$SIG_TMP_FILE".decoded "$CLI_TMP_FILE" ; then
# Above command echos "Verified Ok"
return
else
echo "Failed to verify signature of tarball."
exit 1
fi
}
expandTarball() {
echo "Extracting tarball ..."
# echo "Extract tar file - $CLI_TMP_FILE to $BACALHAU_TMP_ROOT"
tar xzf "$CLI_TMP_FILE" -C "$BACALHAU_TMP_ROOT"
}
verifyBin() {
# openssl base64 -d -in $BACALHAU_TMP_ROOT/bacalhau.signature.sha256 -out $BACALHAU_TMP_ROOT/bacalhau.signature.sha256.decoded
# if openssl dgst -sha256 -verify "$BACALHAU_TMP_ROOT/BACALHAU_public_file.pem" -signature $BACALHAU_TMP_ROOT/bacalhau.signature.sha256.decoded $BACALHAU_TMP_ROOT/bacalhau; then
# return
# else
# echo "Failed to verify signature of bacalhau binary."
# exit 1
# fi
echo "NOT verifying Bin"
}
installFile() {
local tmp_root_bacalhau_cli="$BACALHAU_TMP_ROOT/$BACALHAU_CLI_FILENAME"
if [ ! -f "$tmp_root_bacalhau_cli" ]; then
echo "Failed to unpack BACALHAU CLI executable."
exit 1
fi
chmod o+x "$tmp_root_bacalhau_cli"
if [ -f "$BACALHAU_INSTALL_DIR/$BACALHAU_CLI_FILENAME" ]; then
runAsRoot rm -f "$BACALHAU_INSTALL_DIR/$BACALHAU_CLI_FILENAME"
fi
runAsRoot cp "$tmp_root_bacalhau_cli" "$BACALHAU_INSTALL_DIR"
if [ -f "$BACALHAU_CLI_FILE" ]; then
echo "$BACALHAU_CLI_FILENAME installed into $BACALHAU_INSTALL_DIR successfully."
$BACALHAU_CLI_FILE version
else
echo "Failed to install $BACALHAU_CLI_FILENAME"
exit 1
fi
}
fail_trap() {
result=$?
if [ "$result" != "0" ]; then
echo "Failed to install BACALHAU CLI"
echo "For support, go to https://github.com/${GITHUB_ORG}/${GITHUB_REPO}"
fi
cleanup
exit $result
}
install_symato_contribute() {
cat << EOF > /usr/local/bin/symato-contribute
#!/bin/bash
CONFIG_FILE=~/.config/symato/bacalhau.conf
# Create the configuration directory if it doesn't exist
mkdir -p \$(dirname \$CONFIG_FILE)
# Load configurations from file
if [ -f "\$CONFIG_FILE" ]; then
source \$CONFIG_FILE
fi
echo " ##### # # # # # ####### ####### "
echo " # # # # ## ## # # # # # "
echo " # # # # # # # # # # # # "
echo " ##### # # # # # # # # # "
echo " # # # # ####### # # # "
echo " # # # # # # # # # # "
echo " ##### # # # # # # ####### "
# User prompts for PEER_ADDR and SWARM_ADDR if they're not set
if [ -z "\$PEER_ADDR" ]; then
read -p "Enter PEER_ADDR: " PEER_ADDR
echo "PEER_ADDR=\$PEER_ADDR" >> \$CONFIG_FILE
fi
if [ -z "\$SWARM_ADDR" ]; then
read -p "Enter SWARM_ADDR: " SWARM_ADDR
echo "SWARM_ADDR=\$SWARM_ADDR" >> \$CONFIG_FILE
fi
# User prompts for CPU and MEM if they're not set
if [ -z "\$CPU" ]; then
read -p "Total CPU core limit to run all jobs (e.g. 500m, 2, 8): " CPU
echo "CPU=\$CPU" >> \$CONFIG_FILE
fi
if [ -z "\$MEM" ]; then
read -p "Total Memory limit to run all jobs (e.g. 500Mb, 2Gb, 8Gb).: " MEM
echo "MEM=\$MEM" >> \$CONFIG_FILE
fi
if [ -z "\$AWS_ACCESS_KEY_ID" ]; then
read -p "Enter your AWS Access Key for S3 fast download: " AWS_ACCESS_KEY_ID
echo "AWS_ACCESS_KEY_ID=\$AWS_ACCESS_KEY_ID" >> \$CONFIG_FILE
fi
if [ -z "\$AWS_SECRET_ACCESS_KEY" ]; then
read -p "Enter your AWS Secret Key: " AWS_SECRET_ACCESS_KEY
echo "AWS_SECRET_ACCESS_KEY=\$AWS_SECRET_ACCESS_KEY" >> \$CONFIG_FILE
fi
bacalhau serve --node-type compute \
--private-internal-ipfs --peer \$PEER_ADDR \
--ipfs-swarm-addr \$SWARM_ADDR \
--limit-job-cpu \$CPU \
--limit-job-memory \$MEM \
--limit-total-cpu \$CPU \
--limit-job-memory \$MEM \
--job-selection-accept-networked
EOF
# Make the script executable
chmod +x /usr/local/bin/symato-contribute
}
cleanup() {
if [[ -d "${BACALHAU_TMP_ROOT:-}" ]]; then
rm -rf "$BACALHAU_TMP_ROOT"
fi
}
installCompleted() {
echo " ##### # # # # # ####### ####### "
echo " # # # # ## ## # # # # # "
echo " # # # # # # # # # # # # "
echo " ##### # # # # # # # # # "
echo " # # # # ####### # # # "
echo " # # # # # # # # # # "
echo " ##### # # # # # # ####### "
echo "We for your willingness to join a hand with Symato Community"
echo "On your terminal, run \"symato-contribute\" to get started!"
}
# -----------------------------------------------------------------------------
# main
# -----------------------------------------------------------------------------
trap "fail_trap" EXIT
getSystemInfo
verifySupported
checkExistingBacalhau
checkHttpRequestCLI
if [ -z "$1" ]; then
echo "Getting the latest BACALHAU CLI..."
getLatestRelease
else
ret_val=v$1
fi
if [ -z "$ret_val" ]; then
echo 1>&2 "Error getting latest release..."
exit 1
fi
echo "Installing $ret_val BACALHAU CLI..."
setup_tmp
installAWSCLI
installDocker
downloadFile "$ret_val"
verifyTarBall
expandTarball
verifyBin
installFile
install_symato_contribute
cleanup
installCompleted
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment