Skip to content

Instantly share code, notes, and snippets.

@lostdesign
Forked from ache051/combine_mbtiles.py
Created November 27, 2022 08:27
Show Gist options
  • Save lostdesign/8a283bc44778ceeb5baa2b653f439af0 to your computer and use it in GitHub Desktop.
Save lostdesign/8a283bc44778ceeb5baa2b653f439af0 to your computer and use it in GitHub Desktop.
Combine MBTiles
#-------------------------------------------------------------------------------
# Name: combine_mbtiles.py
# Purpose: Processes multiple sqlite .mbtiles files into single file (or by level? or by degree grids?)
#
# Author: joshn. Modified by ache015
#
# Created: 18/11/2014
#
# Last Modified: 10/11/2018
#-------------------------------------------------------------------------------
import os, sys, datetime, time, math, sqlite3, shutil
#--------------------------------Config-----------------------------------------
sourcePath = "/path/to/mbtiles/"
# The masterfile will be used to append to all other dbs into
masterFile = "master.mbtiles"
fileExtension = ".mbtiles"
filesToSkipTxt = "11_15" #not required for this purpose
logLineNumCharPadding = 20
#-------------------------------------------------------------------------------
def main():
log("Start")
os.chdir(sourcePath)
# Get the sqlite db mbtiles files
files = getFilesToProcess()
log("{0} files to merge".format(len(files)))
# Merge the dbs
mergeFiles(files)
#print files
log("")
log("Complete")
def getFilesToProcess():
filesToProcess = []
for root, dirs, files in os.walk("."):
for file in files:
# Check the file extension an skip file txt
fileName, fileExt = os.path.splitext(file)
if fileExt.lower() == fileExtension.lower() and not filesToSkipTxt in file and not file == masterFile:
filesToProcess.append(file)
return filesToProcess
def mergeFiles(files):
# Backup the masterfile
backupFile(masterFile,"backup")
# Connect to the master db for source
conDest = mbtilesConnect(masterFile)
curDest = conDest.cursor()
optimizeConnection(curDest)
log("master{0}:\t\t{1}".format("\t"*3,getFileSize(os.path.getsize(masterFile))))
for dbFile in files:
log("{0}\t({1})".format(dbFile[:11],getFileSize(os.path.getsize(dbFile))))
# Connect to the source db
conSource = mbtilesConnect(dbFile)
curSource = conSource.cursor()
optimizeConnection(conSource)
# Replace from destination db into the master db
try:
sqlScript = """ATTACH DATABASE '{0}' AS source;
REPLACE INTO map SELECT * FROM source.map;
REPLACE INTO images SELECT * FROM source.images;""".format(dbFile)
curDest.executescript(sqlScript)
curDest.execute("DETACH DATABASE source;")
except Exception as e:
log(e)
log("master{0}:\t\t{1}\n".format("\t"*3,getFileSize(os.path.getsize(masterFile))))
def mbtilesConnect(mbtilesFile):
try:
con = sqlite3.connect(mbtilesFile)
return con
except Exception as e:
log("ERROR Could not connect to database")
log(e)
sys.exit(1)
def optimizeConnection(cur):
# http://www.sqlite.org/pragma.html#pragma_synchronous
# Some operations are as much as 50 or more times faster with synchronous OFF (but more risky if failure)
cur.execute("""PRAGMA synchronous=0""")
cur.execute("""PRAGMA locking_mode=EXCLUSIVE""")
cur.execute("""PRAGMA journal_mode=DELETE""")
def backupFile(fileName, backupAppendName):
backupFileName = "{0}_{1}".format(fileName,backupAppendName)
if not os.path.exists(backupFileName):
log("Creating a backup file '{0}'...".format(backupFileName))
try:
shutil.copyfile(fileName,backupFileName)
log("Backup created\n")
except Exception as e:
log(e)
else:
log("Backup '{0}' already exists.\n".format(backupFileName))
def getFileSize(size):
size_name = ("B","KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
i = int(math.floor(math.log(size,1024)))
p = math.pow(1024,i)
s = round(size/p,2)
if (s > 0):
return '%s %s' % (s,size_name[i])
else:
return '0B'
def log(msg):
print "{0}\t{1}".format(datetime.datetime.now().strftime("%H:%M:%S"),msg)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment