Skip to content

Instantly share code, notes, and snippets.

@run-dlang
Created February 12, 2019 07:56
Show Gist options
  • Save run-dlang/42194aa9518e3d722eb580a1d942467c to your computer and use it in GitHub Desktop.
Save run-dlang/42194aa9518e3d722eb580a1d942467c to your computer and use it in GitHub Desktop.
Code shared from run.dlang.io.
import std.stdio;
import std.array;
import std.conv;
import std.string;
import std.path;
import std.file;
import requests;
import arsd.database, arsd.sqlite;
import std.regex;
import core.time : days;
import std.datetime;
import std.algorithm;
import std.zip;
import std.range;
import std.process;
import std.concurrency;
import core.thread;
import std.random;
string ftp_url;
string ftp_login;
string ftp_pass;
string root_ftp_dir;
int count = 0;
Request rq;
Response rs;
Database db;
string ftp_root_uri = "ftp://ftp.zakupki.gov.ru/";
string python_script_dir = r"D:\code\2018\zakupki-downloader\source2";
string files_folder;
string db_name = "my.db";
string fz_name = "44"; // или 223
struct MyStruct
{
string region_name;
string section_name;
string full_file_path;
string arch_date;
}
MyStruct [] mystructs;
static this()
{
if(fz_name == "44")
{
ftp_url = "ftp.zakupki.gov.ru";
ftp_login = "free";
ftp_pass = "free";
root_ftp_dir = "/fcs_regions/";
}
else
{
ftp_url = "ftp.zakupki.gov.ru";
ftp_login = "fz223free";
ftp_pass = "fz223free";
root_ftp_dir = "/out/published/";
}
}
void main()
{
writeln(thisExePath());
files_folder = dirName(thisExePath()) ~ `/files/`;
if(!db_name.exists()) // скорее всего тут будет падать, некогда проверять
{
init_db();
}
db = new Sqlite(db_name);
//downloadFile();
chdir(python_script_dir);
//foreach(task; iota(0,10))
//{
//spawn(&extractFromDBAndProcess);
//spawn(&foo);
//}
extractFromDBAndProcess();
//listOfFinalFolders(); // только выгружаем в БД данные
}
void foo()
{
//int x = Random(12);
auto rnd = MinstdRand0(10);
Thread.sleep(rnd.uniform!ubyte.seconds);
}
void init_db()
{
string sql_create_1 = `CREATE TABLE IF NOT EXISTS ftp_files (ID INTEGER PRIMARY KEY AUTOINCREMENT, region TEXT NOT NULL, section_name TEXT NOT NULL, ftp_file_full_path TEXT NOT NULL UNIQUE, arch_date TEXT NOT NULL, processing_status TEXT)`;
db.query(sql_create_1);
string sql_create_2 = `CREATE TABLE IF NOT EXISTS "processing_files"("arch_full_name" Text, "error_message" Text, "status" Text, "id" Integer NOT NULL PRIMARY KEY AUTOINCREMENT, "xml_name" Text );`;
db.query(sql_create_2);
}
void listOfFinalFolders() // список файлов с полными путями
{
MyStruct mystruct;
rq.verbosity = 3;
rq.authenticator = new BasicAuthentication(ftp_login, ftp_pass);
string [] sectionsForProcessing = listOfSectionForProcessing();
if(fz_name == "223")
{
rs = rq.get("ftp://ftp.zakupki.gov.ru/out/published/");
string [] list_of_root_regions_folders;
foreach (line; splitLines(to!string(rs.responseBody)))
{
if(line.startsWith(`/`))
{
foreach(section; sectionsForProcessing)
{
string folder_path;
if(baseName(line).toLower == "moskva") // пока только для москвы
{
folder_path = `ftp://ftp.zakupki.gov.ru/out/published/` ~ baseName(line) ~ `/` ~ section ~ `/daily/`; // baseName(line) - название региона
//writeln("folder_paths: ", folder_paths);
string [] file_list;
file_list = getListOfFolderFiles(folder_path);
foreach(file_full_name; file_list)
{
auto str_date = matchFirst(file_full_name, regex(r"([0-9]{8})")); // 20180711
if (to!int(str_date.hit[0..4])>=2018) // берем только 2018 и старше
{
mystruct.region_name = baseName(line);
mystruct.section_name = section;
mystruct.full_file_path = file_full_name;
mystruct.arch_date = str_date.hit;
mystructs ~= mystruct;
}
}
}
}
}
}
}
if(fz_name == "44")
{
rs = rq.get("ftp://ftp.zakupki.gov.ru/fcs_regions/");
string [] list_of_root_regions_folders;
foreach (line; splitLines(to!string(rs.responseBody)))
{
if(line.startsWith(`/`))
{
foreach(section; sectionsForProcessing)
{
string folder_path;
if(baseName(line).toLower == "moskva") // пока только для москвы
{
folder_path = `ftp://ftp.zakupki.gov.ru/fcs_regions/` ~ baseName(line) ~ `/` ~ section ~ `/currMonth/`; // baseName(line) - название региона
//writeln("folder_paths: ", folder_paths);
string [] file_list;
file_list = getListOfFolderFiles(folder_path);
foreach(file_full_name; file_list)
{
auto str_date = matchFirst(file_full_name, regex(r"([0-9]{8})")); // 20180711
if (to!int(str_date.hit[0..4])>=2018) // берем только 2018 и старше
{
mystruct.region_name = baseName(line);
mystruct.section_name = section;
mystruct.full_file_path = file_full_name;
mystruct.arch_date = str_date.hit;
mystructs ~= mystruct;
}
}
}
}
}
}
}
else
{
writeln("Unkdown FZ");
}
saveToDB();
}
string [] getListOfFolderFiles(string folder)
{
string [] file_list;
rq.verbosity = 3;
rq.authenticator = new BasicAuthentication(ftp_login, ftp_pass);
rs = rq.get(folder);
foreach (line; splitLines(to!string(rs.responseBody)))
{
if(line.startsWith(`/`))
{
file_list ~= line;
//writeln(line);
}
}
return file_list;
}
void saveToDB()
{
writeln("saveToDB");
foreach(mystr; mystructs)
{
//writeln(mystr);
db.query(`INSERT INTO ftp_files (region, section_name, ftp_file_full_path, arch_date) VALUES (?, ?, ?, ?);`, mystr.region_name, mystr.section_name, mystr.full_file_path, mystr.arch_date);
}
}
void extractFromDBAndProcess()
{
foreach(row; db.query(`SELECT ID, region, section_name, ftp_file_full_path, arch_date, processing_status FROM ftp_files WHERE processing_status IS NULL AND isProcessing IS NOT True;`))
{
string sql_set_processing_flag = `UPDATE "ftp_files" SET "isProcessing" = True WHERE "ID" = ` ~ row["ID"] ~ `;`;
db.query(sql_set_processing_flag);
writeln(row["ftp_file_full_path"]);
downloadFile(row["ftp_file_full_path"], row["section_name"], to!int(row["ID"]));
}
}
void updateArchiveStatus(string status, int id)
{
string sql = (`UPDATE "ftp_files" SET "processing_status" = "%s" WHERE id=%d`).format(status,id);
db.query(sql);
}
void updateXMLStatus(string arch_name, string xml_name, string status, string error_message = "")
{
string sql;
if(error_message == "")
sql = (`INSERT OR REPLACE INTO processing_files(arch_full_name, xml_name, status) VALUES ('%s', '%s', '%s')`).format(arch_name, xml_name, status);
else
sql = (`INSERT OR REPLACE INTO processing_files(arch_full_name, xml_name, status, error_message) VALUES ('%s', '%s', '%s', '%s')`).format(arch_name, xml_name, status, error_message);
db.query(sql);
}
void downloadFile(string full_file_path, string section_name, int id )
{
if(!exists("files"))
{
auto dir = "files";
dir.mkdir;
}
rq.authenticator = new BasicAuthentication(ftp_login, ftp_pass);
Response rs;
try // потенциальное падение. Пофиг если упало
{
rs = rq.get(ftp_root_uri ~ `/` ~ full_file_path);
}
catch (Exception e)
{
writeln("[ERROR] FAILED request to FTP");
}
string archive_path = files_folder ~ baseName(full_file_path);
writeln("archive_path: ", archive_path);
File f = File(archive_path, "wb");
f.rawWrite(rs.responseBody.data);
f.close();
if(getSize(archive_path) < 256)
{
writeln("File too small and deleted: ", archive_path);
archive_path.remove;
return;
}
processSingleFile(archive_path, section_name, id);
archive_path.remove();
}
void processSingleFile(string archive_path, string section_name, int id)
{
auto zip = new ZipArchive(read(archive_path));
foreach (file_name, am; zip.directory)
{
if(file_name.canFind("Cancel")) // обычно касается протоколов. Подобные мы пропускаем
continue;
if(file_name.extension != ".xml")
continue;
writeln("file_name: ", file_name);
//readln();
writefln("%10s %08x %s", am.expandedSize, am.crc32, file_name);
assert(am.expandedData.length == 0);
// decompress the archive member
auto my_xml = zip.expand(am);
string xml_dir = archive_path.stripExtension.stripExtension;
if(!exists(xml_dir))
{
auto dir = xml_dir;
dir.mkdir;
}
string xml_full_path = xml_dir ~ `/` ~ file_name;
File f = File(xml_full_path, "wb");
f.rawWrite(my_xml);
f.close();
writeln("xml_full_path: ", xml_full_path);
//writeln(`python ` ~ python_script_dir ~ `\main.py ` ~ xml_full_path ~ ` ` ~ section_name);
// auto pid = spawnShell(`python ` ~ python_script_dir ~ `\main.py ` ~ xml_full_path ~ ` ` ~ section_name);
// auto exitCode = wait(pid);
// if(exitCode == 0)
// updateXMLStatus(archive_path, file_name, "success");
// else
//updateXMLStatus(archive_path, file_name, "fail");
auto result = executeShell(`python ` ~ python_script_dir ~ `\main.py ` ~ xml_full_path ~ ` ` ~ section_name);
//auto exitCode = wait(pid);
if(result.status == 0)
{
updateXMLStatus(archive_path, file_name, "success", "success"); // ставим error_text в error_text (последнее), чтобы нагляднее было
}
else
{
bool isErrorTextFound = false;
string error_text;
foreach(line; result.output.splitLines())
{
if (line.canFind("ORA")) // мы нашли почему упало
{
isErrorTextFound = true;
error_text = line;
}
}
if (isErrorTextFound)
{
updateXMLStatus(archive_path, file_name, "fail", error_text);
}
else
{
updateXMLStatus(archive_path, file_name, "fail");
}
//readln;
writeln("----------------------------");
}
writeln("\ncount: ", count++);
}
updateArchiveStatus("done", id); // когда обработали все файлы в архиве
}
string [] listOfSectionForProcessing()
{
string [] sections;
if(fz_name == "223")
{
//string [] sections = ["purchaseContract", "purchaseNotice", "purchaseNoticeAE", "purchaseProtocol"];
sections = ["purchaseContract", "purchaseNotice"];
}
if(fz_name == "44")
{
sections = ["protocols", "contracts", "notifications"];
}
return sections;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment