Skip to content

Instantly share code, notes, and snippets.

@andering
Last active January 12, 2020 02:33
Show Gist options
  • Save andering/f517d72f33b0eff6348d07b429b9e9f7 to your computer and use it in GitHub Desktop.
Save andering/f517d72f33b0eff6348d07b429b9e9f7 to your computer and use it in GitHub Desktop.
test/ean.py
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
Import EANs from csv files.
Import EANs from csv files and run each column through filter by header column definition
"""
import sys
import os
import csv
import functools
import pymysql
import time
from pyparsing import Word, alphas, OneOrMore, Suppress, ZeroOrMore, LineStart, LineEnd, oneOf, Optional, alphanums, Keyword, Group, Empty, NoMatch, nums, ParseException
class Pymysql:
def __init__(self):
self.connection = pymysql.connect(host='localhost',
user='root',
password='Nimda159--',
db='sportfot_new',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor)
def query(self, query, data=()):
with self.connection.cursor() as cursor:
cursor.execute(query, data)
return cursor
def put(self, generated_ean, ean):
cursor = self.query('''UPDATE product SET ean = %s WHERE generated_ean = %s''', (ean,generated_ean))
self.connection.commit()
return cursor
def get(self,manufacturer_code,color_code,size_code):
regexp = f"^{manufacturer_code}:{color_code}(?=:).*(:){size_code}(:|$).*$"
cursor = self.query('''SELECT ean, generated_ean FROM product WHERE (user_code = %s OR user_code LIKE %s) AND generated_ean REGEXP %s''', (manufacturer_code,manufacturer_code + "-%",regexp))
return cursor
class Filters:
def adidas_code(self, s, l, t):
return {'manufacturer_code':t[0], 'color_code':t[0]}
def nike_code(self, s, l, t):
return {'manufacturer_code':t[0]}
def reusch_code(self, s, l, t):
return {'manufacturer_code':t[0]}
def uhlsport_code(self, s, l, t):
return {'manufacturer_code':t[0], 'color_code':t[0][-2:]}
def geco_code(self, s, l, t):
p = Word(alphanums,exact=5) + Optional(Word(nums)) + Optional(Word(alphas))
p = p.parseString(t[0])
return {'manufacturer_code':p[0], 'color_code':p[1], 'size_code':p[2]}
def color_code(self, s, l, t):
return {'color_code':t[0]}
def size_code(self, s, l, t):
return {'size_code':t[0]}
def ean(self, s, l, t):
return {'ean':t[0]}
def __init__(self):
self.filters = []
self.filters.append(Keyword("adidas_code").setParseAction(
lambda s, l, t: Word(alphanums + "-/.").setParseAction(self.adidas_code)))
self.filters.append(Keyword("nike_code").setParseAction(
lambda s, l, t: Word(alphanums + "-/.").setParseAction(self.nike_code)))
self.filters.append(Keyword("reusch_code").setParseAction(
lambda s, l, t: Word(alphanums + "-/.").setParseAction(self.reusch_code)))
self.filters.append(Keyword("uhlsport_code").setParseAction(
lambda s, l, t: Word(alphanums + "-/.").setParseAction(self.uhlsport_code)))
self.filters.append(Keyword("geco_code").setParseAction(
lambda s, l, t: Word(alphanums + "-/.").setParseAction(self.geco_code)))
self.filters.append(Keyword("color_code").setParseAction(
lambda s, l, t: Word(alphanums + "-/.").setParseAction(self.color_code)))
self.filters.append(Keyword("size_code").setParseAction(
lambda s, l, t: Word(alphanums + "-/.").setParseAction(self.size_code)))
self.filters.append(Keyword("ean").setParseAction(
lambda s, l, t: Word(alphanums + "-/.").setParseAction(self.ean)))
class Ean:
def __init__(self):
self.db = Pymysql()
self.filters = Filters()
def OR(self,list):
OR = NoMatch()
for l in list:
OR |= l
return OR
def AND(self,list):
OR = Empty()
for l in list:
OR += l
return OR
def get_csvs(self,path):
with os.scandir(path) as files:
for file in files:
if file.name.endswith(".csv"):
yield file
def parse_csv(self,f_input):
mask = None
for i, line in enumerate(f_input):
if i == 0:
mask = (Suppress(LineStart()) + OneOrMore(self.OR(self.filters.filters) + Optional(",").setParseAction(lambda s, l, t: Optional(Suppress(",")))) + Suppress(LineEnd())).parseString(line)
mask = self.AND(mask)
else:
try:
values = (Suppress(LineStart()) + mask + Suppress(LineEnd())).parseString(line)
values = {k:v for d in values for k,v in d.items()}
result = self.db.get(values['manufacturer_code'],values['color_code'],values['size_code']);
if result.rowcount == 1:
result = result.fetchone()
state = 'UNKNOWN STATE'
if not values['ean']:
state = 'MISSING EAN'
elif not result['ean'] and values['ean']:
self.db.put(result['generated_ean'],values['ean'])
state = 'UPDATED'
elif result['ean'] == values['ean']:
state = 'AS IS'
elif result['ean'] != values['ean']:
state = 'EAN IN COLISION'
elif result.rowcount > 1:
state = 'MORE THAN ONE MATCH'
else:
state = 'NO MATCH'
except ParseException:
state = 'WRONG FORMAT'
yield state + ',' + line
def main():
ean = Ean()
for f_csv in ean.get_csvs('input/'):
print(f_csv.path)
with open(f_csv.path, 'r', errors='ignore') as f_input, open(f_csv.path + ".txt", 'w') as f_output:
for i,line in enumerate(ean.parse_csv(f_input)):
f_output.write(line)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment