Last active
March 7, 2019 18:38
-
-
Save omargourari/f13e1be16a567a1efd27522b155a062c to your computer and use it in GitHub Desktop.
Example scritp x Zeep client library AttributeError: _tls
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import json | |
import os | |
import pathvalidate | |
import pandas as pd | |
import requests | |
import application as app | |
from flask import jsonify, request | |
from flask.views import MethodView | |
from zeep import Client, helpers | |
from zeep.transports import Transport | |
from lxml.html import fromstring | |
from zeep.cache import SqliteCache | |
import logging.config | |
logging.config.dictConfig({ | |
'version': 1, | |
'formatters': { | |
'verbose': { | |
'format': '%(name)s: %(message)s' | |
} | |
}, | |
'handlers': { | |
'console': { | |
'level': 'DEBUG', | |
'class': 'logging.StreamHandler', | |
'formatter': 'verbose', | |
}, | |
}, | |
'loggers': { | |
'zeep.transports': { | |
'level': 'DEBUG', | |
'propagate': True, | |
'handlers': ['console'], | |
}, | |
} | |
}) | |
def get_proxies(): | |
url = 'https://free-proxy-list.net/' | |
response = requests.get(url) | |
parser = fromstring(response.text) | |
proxies = set() | |
for i in parser.xpath('//tbody/tr')[:10]: | |
if i.xpath('.//td[7][contains(text(),"yes")]'): | |
proxy = ":".join([i.xpath('.//td[1]/text()')[0], | |
i.xpath('.//td[2]/text()')[0]]) | |
proxies.add(proxy) | |
return proxies | |
proxies_set = get_proxies() | |
JSON_MIME_TYPE = 'application/json' | |
BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '../')) | |
UPLOAD_DIRECTORY = os.path.join(BASE_DIR, 'media/') | |
class CheckVat(MethodView): | |
def __init__(self): | |
self.input_file_df = None | |
self.vat_data = None | |
self.input_file_extension = None | |
self.input_file = None | |
self.output_file = None | |
self.stats = { | |
'valid': 0, | |
'not_valid': 0, | |
'total': 0 | |
} | |
def check_vat_data(self): | |
""" Receive a data-frame object and check VAT number for EU member state using the SOAP Service | |
:return: response | |
""" | |
for index, data in enumerate(self.vat_data): | |
vat_number = pathvalidate.replace_symbol(str(data[0])) | |
country_code = data[1] | |
if not (vat_number == "") or (country_code == ""): | |
cache = SqliteCache(path='./sqlite.db', timeout=6000) | |
transport = Transport(cache=cache, operation_timeout=25000) | |
client = Client( | |
"http://ec.europa.eu/taxation_customs/vies/checkVatService.wsdl", transport=transport) | |
client.transport.session.proxies = proxies_set | |
request_data = { | |
'countryCode': country_code, | |
'vatNumber': vat_number | |
} | |
res = client.service.checkVat(**request_data) | |
input_dict = helpers.serialize_object(res) | |
input_dict.pop('requestDate') | |
response = json.loads(json.dumps(input_dict)) | |
else: | |
response = {'valid': False} | |
data.append(response) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment