Last active
January 12, 2020 02:33
-
-
Save andering/f517d72f33b0eff6348d07b429b9e9f7 to your computer and use it in GitHub Desktop.
test/ean.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty File |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: UTF-8 -*- | |
""" | |
Import EANs from csv files. | |
Import EANs from csv files and run each column through filter by header column definition | |
""" | |
import sys | |
import os | |
import csv | |
import functools | |
import pymysql | |
import time | |
from pyparsing import Word, alphas, OneOrMore, Suppress, ZeroOrMore, LineStart, LineEnd, oneOf, Optional, alphanums, Keyword, Group, Empty, NoMatch, nums, ParseException | |
class Pymysql: | |
def __init__(self): | |
self.connection = pymysql.connect(host='localhost', | |
user='root', | |
password='Nimda159--', | |
db='sportfot_new', | |
charset='utf8mb4', | |
cursorclass=pymysql.cursors.DictCursor) | |
def query(self, query, data=()): | |
with self.connection.cursor() as cursor: | |
cursor.execute(query, data) | |
return cursor | |
def put(self, generated_ean, ean): | |
cursor = self.query('''UPDATE product SET ean = %s WHERE generated_ean = %s''', (ean,generated_ean)) | |
self.connection.commit() | |
return cursor | |
def get(self,manufacturer_code,color_code,size_code): | |
regexp = f"^{manufacturer_code}:{color_code}(?=:).*(:){size_code}(:|$).*$" | |
cursor = self.query('''SELECT ean, generated_ean FROM product WHERE (user_code = %s OR user_code LIKE %s) AND generated_ean REGEXP %s''', (manufacturer_code,manufacturer_code + "-%",regexp)) | |
return cursor | |
class Filters: | |
def adidas_code(self, s, l, t): | |
return {'manufacturer_code':t[0], 'color_code':t[0]} | |
def nike_code(self, s, l, t): | |
return {'manufacturer_code':t[0]} | |
def reusch_code(self, s, l, t): | |
return {'manufacturer_code':t[0]} | |
def uhlsport_code(self, s, l, t): | |
return {'manufacturer_code':t[0], 'color_code':t[0][-2:]} | |
def geco_code(self, s, l, t): | |
p = Word(alphanums,exact=5) + Optional(Word(nums)) + Optional(Word(alphas)) | |
p = p.parseString(t[0]) | |
return {'manufacturer_code':p[0], 'color_code':p[1], 'size_code':p[2]} | |
def color_code(self, s, l, t): | |
return {'color_code':t[0]} | |
def size_code(self, s, l, t): | |
return {'size_code':t[0]} | |
def ean(self, s, l, t): | |
return {'ean':t[0]} | |
def __init__(self): | |
self.filters = [] | |
self.filters.append(Keyword("adidas_code").setParseAction( | |
lambda s, l, t: Word(alphanums + "-/.").setParseAction(self.adidas_code))) | |
self.filters.append(Keyword("nike_code").setParseAction( | |
lambda s, l, t: Word(alphanums + "-/.").setParseAction(self.nike_code))) | |
self.filters.append(Keyword("reusch_code").setParseAction( | |
lambda s, l, t: Word(alphanums + "-/.").setParseAction(self.reusch_code))) | |
self.filters.append(Keyword("uhlsport_code").setParseAction( | |
lambda s, l, t: Word(alphanums + "-/.").setParseAction(self.uhlsport_code))) | |
self.filters.append(Keyword("geco_code").setParseAction( | |
lambda s, l, t: Word(alphanums + "-/.").setParseAction(self.geco_code))) | |
self.filters.append(Keyword("color_code").setParseAction( | |
lambda s, l, t: Word(alphanums + "-/.").setParseAction(self.color_code))) | |
self.filters.append(Keyword("size_code").setParseAction( | |
lambda s, l, t: Word(alphanums + "-/.").setParseAction(self.size_code))) | |
self.filters.append(Keyword("ean").setParseAction( | |
lambda s, l, t: Word(alphanums + "-/.").setParseAction(self.ean))) | |
class Ean: | |
def __init__(self): | |
self.db = Pymysql() | |
self.filters = Filters() | |
def OR(self,list): | |
OR = NoMatch() | |
for l in list: | |
OR |= l | |
return OR | |
def AND(self,list): | |
OR = Empty() | |
for l in list: | |
OR += l | |
return OR | |
def get_csvs(self,path): | |
with os.scandir(path) as files: | |
for file in files: | |
if file.name.endswith(".csv"): | |
yield file | |
def parse_csv(self,f_input): | |
mask = None | |
for i, line in enumerate(f_input): | |
if i == 0: | |
mask = (Suppress(LineStart()) + OneOrMore(self.OR(self.filters.filters) + Optional(",").setParseAction(lambda s, l, t: Optional(Suppress(",")))) + Suppress(LineEnd())).parseString(line) | |
mask = self.AND(mask) | |
else: | |
try: | |
values = (Suppress(LineStart()) + mask + Suppress(LineEnd())).parseString(line) | |
values = {k:v for d in values for k,v in d.items()} | |
result = self.db.get(values['manufacturer_code'],values['color_code'],values['size_code']); | |
if result.rowcount == 1: | |
result = result.fetchone() | |
state = 'UNKNOWN STATE' | |
if not values['ean']: | |
state = 'MISSING EAN' | |
elif not result['ean'] and values['ean']: | |
self.db.put(result['generated_ean'],values['ean']) | |
state = 'UPDATED' | |
elif result['ean'] == values['ean']: | |
state = 'AS IS' | |
elif result['ean'] != values['ean']: | |
state = 'EAN IN COLISION' | |
elif result.rowcount > 1: | |
state = 'MORE THAN ONE MATCH' | |
else: | |
state = 'NO MATCH' | |
except ParseException: | |
state = 'WRONG FORMAT' | |
yield state + ',' + line | |
def main(): | |
ean = Ean() | |
for f_csv in ean.get_csvs('input/'): | |
print(f_csv.path) | |
with open(f_csv.path, 'r', errors='ignore') as f_input, open(f_csv.path + ".txt", 'w') as f_output: | |
for i,line in enumerate(ean.parse_csv(f_input)): | |
f_output.write(line) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment