Skip to content

Instantly share code, notes, and snippets.

@fhdez
Created September 20, 2018 14:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fhdez/73294d6dbdba35250e1cc197df5c17d3 to your computer and use it in GitHub Desktop.
Save fhdez/73294d6dbdba35250e1cc197df5c17d3 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import datetime
import re
import unicodecsv as csv
from datetime import date, timedelta
from optparse import make_option
from django.core.exceptions import ObjectDoesNotExist
from django.core.management.base import BaseCommand
from redmag.acuerdo.models import SubCategoria as Tema
from redmag.meds.models import (
Archivo,
Categoria as Asignatura,
Competencia, Enlace, Grado, Med,
SubCategoria as Categoria,
Video
)
from redmag.planeaciones.models import Etapa, Planeacion
from redmag.usuarios.models import Usuario
class Command(BaseCommand):
"""
Importa MEDs desde un archivo CSV
P.E
export LANG='en_US.UTF-8'
export LC_ALL='en_US.UTF-8'
./manage.py meds_importar
respaldos/meds.csv
respaldos/meds_enlace.csv
respaldos/meds_video.csv
respaldos/meds_archivo.csv
respaldos/planeaciones.csv
"""
args = (
'<csv_path_meds> <csv_path_enlaces> '
'<csv_path_videos> <csv_path_archivos> '
'<csv_path_planeaciones>'
)
help = 'Importa MEDs desde un archivo CSV, '
option_list = BaseCommand.option_list + (
make_option(
'--base_path',
action='store',
dest='base_path',
default='',
help='Specify the path where the images are located'
),
)
delimiter = str('^')
def handle(self, *args, **options):
self.uploads_path = options['base_path']
if len(args) == 5:
csv_path_meds = args[0]
csv_path_enlaces = args[1]
csv_path_videos = args[2]
csv_path_archivos = args[3]
csv_path_planeaciones = args[4]
meds = self.parse_meds(csv_path_meds)
self.create_meds(meds)
enlaces = self.parse_enlaces(csv_path_enlaces)
self.create_enlaces(enlaces)
videos = self.parse_videos(csv_path_videos)
self.create_videos(videos)
archivos = self.parse_archivos(csv_path_archivos)
self.create_archivos(archivos)
self.parse_planeaciones(csv_path_planeaciones)
else:
self.stdout.write("ERROR: Número incorrecto de argumentos")
def parse_number(self, value):
"""
Extrae un número convirtiendolo a un formato usable por el script.
"""
return re.sub(r'\D', '', value)
def parse_bool_string(self, string):
return string[0].upper() == 'T'
def parse_meds(self, csv_path):
"Devuelve los valores limpios"
with open(csv_path, 'r') as csv_file:
# Dictionary from file
dictionary = csv.DictReader(csv_file, delimiter=self.delimiter)
meds_models = []
for row_number, fields in enumerate(dictionary):
print("\r Procesando CSV Meds..." + str(row_number)),
nombre_del_documento = fields["nombre_del_documento"]
descripcion_del_documento = fields["descripcion_del_documento"]
estado = fields["estado"]
imagen = fields["imagen"]
inteligencia_linguistica = fields["inteligencia_linguistica"]
inteligencia_logica_matematica = (
fields["inteligencia_logica_matematica"])
inteligencia_espacial = fields["inteligencia_espacial"]
inteligencia_corporal_cinestesica = (
fields["inteligencia_corporal_cinestesica"])
inteligencia_musical = fields["inteligencia_musical"]
inteligencia_naturalista = fields["inteligencia_naturalista"]
moderada = self.parse_bool_string(fields["moderada"])
url = fields["url"]
descripcion_forma_de_uso = fields["descripcion_forma_de_uso"]
fecha_subida = (
datetime.datetime.strptime(
fields["fecha_subida"], "%Y-%m-%d").date()
)
busquedas = fields["busquedas"]
contento = self.parse_bool_string(fields["contento"])
aburrido = self.parse_bool_string(fields["aburrido"])
distraido = self.parse_bool_string(fields["distraido"])
inquieto = self.parse_bool_string(fields["inquieto"])
sereno = self.parse_bool_string(fields["sereno"])
jugueton = self.parse_bool_string(fields["jugueton"])
impaciente = self.parse_bool_string(fields["impaciente"])
creativo = self.parse_bool_string(fields["creativo"])
atento = self.parse_bool_string(fields["atento"])
curioso = self.parse_bool_string(fields["curioso"])
participativo = self.parse_bool_string(fields["participativo"])
en_portada = self.parse_bool_string(fields["en_portada"])
is_meritum = self.parse_bool_string(fields["is_meritum"])
slug = fields["slug"]
excitado = fields["excitado"]
inteligencia_interpersonal = (
fields["inteligencia_interpersonal"])
inteligencia_intrapersonal = (
fields["inteligencia_intrapersonal"])
is_evaluacion = self.parse_bool_string(fields["is_evaluacion"])
usuario = fields["usuario"]
categoria = fields["categoria"]
tema = fields["tema"]
try:
usuario = Usuario.objects.get(email=usuario)
except:
usuario = Usuario.objects.get(
email='redmagisterial@outlook.com'
)
obj_categoria = None
if categoria:
obj_categoria = Categoria.objects.filter(
texto=categoria
)[0]
obj_tema = None
if tema:
obj_tema = Tema.objects.filter(texto=tema)[0]
med = Med(
fecha_subida=fecha_subida,
usuario=usuario,
nombre_del_documento=nombre_del_documento,
descripcion_del_documento=descripcion_del_documento,
descripcion_forma_de_uso=descripcion_forma_de_uso,
estado=estado,
moderada=moderada,
imagen=imagen,
categoria=obj_categoria, # None en caso de que no sea una evaluación
tema=obj_tema, # None en caso de que no sea una evaluación
is_meritum=is_meritum,
slug=slug,
is_evaluacion=is_evaluacion,
# agregar is_nuevomodelo=True si corresponde al nuevo modelo educativo
# agregar bloque='I' que este sugerido el med
# agregar aprendizajeesperado=obj_aprendizajeesperado que es el nuevo que determina el NME.
inteligencia_linguistica=inteligencia_linguistica,
inteligencia_logica_matematica=(
inteligencia_logica_matematica),
inteligencia_espacial=inteligencia_espacial,
inteligencia_corporal_cinestesica=(
inteligencia_corporal_cinestesica),
inteligencia_interpersonal=inteligencia_interpersonal,
inteligencia_intrapersonal=inteligencia_intrapersonal,
inteligencia_musical=inteligencia_musical,
inteligencia_naturalista=inteligencia_naturalista,
aburrido=aburrido,
inquieto=inquieto,
creativo=creativo,
jugueton=jugueton,
distraido=distraido,
contento=contento,
sereno=sereno,
excitado=excitado,
impaciente=impaciente,
atento=atento,
curioso=curioso,
participativo=participativo,
url=url,
busquedas=busquedas,
en_portada=en_portada,
)
meds_models.append(med)
return meds_models
def create_meds(self, object_list):
"""
Crea la planeación con los valores provistos.
"""
obj_getted = 0
obj_created = 0
for item in object_list:
try:
rs = Med.objects.filter(
slug=item.slug
)
for r in rs:
print r.id
if rs.count() > 0:
self.imprimirsalidaimportacion(
"Existente",
item,
Med
)
obj_getted += 1
else:
raise Med.DoesNotExist
except Med.DoesNotExist:
item.save()
self.imprimirsalidaimportacion(
"Creado",
item,
Med
)
obj_created += 1
self.stdout.write("%d Existentes" % obj_getted)
self.stdout.write("%d Creados" % obj_created)
def parse_enlaces(self, csv_path):
"Devuelve los valores limpios"
with open(csv_path, 'r') as csv_file:
# Dictionary from file
dictionary = csv.DictReader(csv_file, delimiter=self.delimiter)
enlace_models = []
for row_number, fields in enumerate(dictionary):
print("\r Procesando CSV Enlaces..." + str(row_number)),
url = fields["url"]
med_slug = fields["med"]
obj_med = Med.objects.get(slug=med_slug)
enlace = Enlace(
url=url,
med=obj_med,
)
enlace_models.append(enlace)
return enlace_models
def create_enlaces(self, object_list):
"""
Crea la planeación con los valores provistos.
"""
obj_created = 0
for item in object_list:
item.save()
self.imprimirsalidaimportacion(
"Creado",
item,
Enlace
)
obj_created += 1
self.stdout.write("%d Creados" % obj_created)
def parse_videos(self, csv_path):
"Devuelve los valores limpios"
with open(csv_path, 'r') as csv_file:
# Dictionary from file
dictionary = csv.DictReader(csv_file, delimiter=self.delimiter)
video_models = []
for row_number, fields in enumerate(dictionary):
print("\r Procesando CSV Videos..." + str(row_number)),
url = fields["url"]
med_slug = fields["med"]
obj_med = Med.objects.get(slug=med_slug)
video = Video(
url=url,
med=obj_med,
)
video_models.append(video)
return video_models
def create_videos(self, object_list):
"""
Crea la planeación con los valores provistos.
"""
obj_created = 0
for item in object_list:
item.save()
self.imprimirsalidaimportacion(
"Creado",
item,
Video
)
obj_created += 1
self.stdout.write("%d Creados" % obj_created)
def parse_archivos(self, csv_path):
"Devuelve los valores limpios"
with open(csv_path, 'r') as csv_file:
# Dictionary from file
dictionary = csv.DictReader(csv_file, delimiter=self.delimiter)
archivo_models = []
for row_number, fields in enumerate(dictionary):
print("\r Procesando CSV Archivos..." + str(row_number)),
archivo = fields["archivo"]
med_slug = fields["med"]
obj_med = Med.objects.get(slug=med_slug)
archivo = Archivo(
archivo=archivo,
med=obj_med,
)
archivo_models.append(archivo)
return archivo_models
def create_archivos(self, object_list):
"""
Crea la planeación con los valores provistos.
"""
obj_created = 0
for item in object_list:
item.save()
self.imprimirsalidaimportacion(
"Creado",
item,
Archivo
)
obj_created += 1
self.stdout.write("%d Creados" % obj_created)
def parse_planeaciones(self, csv_path):
"Devuelve los valores limpios"
with open(csv_path, 'r') as csv_file:
# Dictionary from file
dictionary = csv.DictReader(csv_file, delimiter=self.delimiter)
planeacion_models = []
correctos = 0
creados = 0
errores = 0
print("Procesando CSV Planeaciones...")
for row_number, fields in enumerate(dictionary):
grado = fields["grado"]
categoria_val = fields["categoria_val"]
subcategoria_val = fields["subcategoria_val"]
subcategoria_texto = fields["subcategoria_texto"]
semana = fields["semana"]
sesion = fields["sesion"]
usuario = fields["usuario"]
tema = fields["tema"]
competencia = fields["competencia"]
bloque = fields["bloque"]
aprendizaje = fields["aprendizaje"]
evaluacion = fields["evaluacion"]
inicio_tiempo = fields["inicio_duracion"]
inicio_contenido = fields["inicio_contenido"]
inicio_recursos = fields["inicio_recursos"]
desarrollo_tiempo = fields["desarrollo_duracion"]
desarrollo_contenido = fields["desarrollo_contenido"]
desarrollo_recursos = fields["desarrollo_recursos"]
conclusion_tiempo = fields["conclusion_duracion"]
conclusion_contenido = fields["conclusion_contenido"]
conclusion_recursos = fields["conclusion_recursos"]
paginas = fields["paginas"]
planeaciones = Planeacion.objects.filter(
grado__val=grado,
categoria__categoria__val=categoria_val,
semana=semana,
sesion=sesion
)
if planeaciones.count() > 0:
print 'repetido: {0}'.format(planeaciones.count())
planeacion = planeaciones[0]
inicio = self.etapa('inicio', locals())
desarrollo = self.etapa('desarrollo', locals())
conclusion = self.etapa('conclusion', locals())
planeacion.inicio = inicio
planeacion.desarrollo = desarrollo
planeacion.conclusion = conclusion
planeacion.save()
print "Existente planeación - {0} - {1}".format(
planeacion.id, planeacion.slug
)
print "-----------------------------------------------"
correctos += 1
else:
print "Intentando crear planeacion"
usuario = self.default(
Usuario, 476, email=usuario
)
grado = Grado.objects.get(
val=grado,
)
subcategoria_val = "-".join(
subcategoria_val.split("-")[1:]
)
asignatura = Asignatura.objects.get(
val=categoria_val
)
categoria, created = (
asignatura.subcategorias.get_or_create(
texto=subcategoria_texto
)
)
try:
competencia_obj = Competencia.objects.get(
texto=competencia
)
except:
competencia_obj = Competencia(
texto=competencia
)
obj = Planeacion(
categoria=categoria,
grado=grado,
competencia=competencia_obj,
usuario=usuario,
semana=semana,
sesion=sesion,
bloque=bloque,
aprendizaje=aprendizaje,
evaluacion=evaluacion,
inicio=self.etapa('inicio', locals()),
desarrollo=self.etapa('desarrollo', locals()),
conclusion=self.etapa('conclusion', locals()),
comienzo=date.today(),
final=date.today() + timedelta(days=7),
es_moderada=True,
es_oficial=True,
estado='A'
)
inicio = self.etapa('inicio', locals())
desarrollo = self.etapa('desarrollo', locals())
conclusion = self.etapa('conclusion', locals())
obj.inicio = inicio
obj.desarrollo = desarrollo
obj.conclusion = conclusion
obj.save()
print "Creada planeación - {0} - {1}".format(
obj.id, obj.slug
)
print "-----------------------------------------------"
creados += 1
print "correctos: {0}".format(correctos)
print "creados: {0}".format(creados)
print "errores: {0}".format(errores)
def etapa(self, nombre, variables):
duration = variables[nombre + '_tiempo'][:5]
etapa = Etapa.objects.create(
pagina=variables['paginas'][:50],
contenido=variables[nombre + '_contenido'],
duracion=duration
)
recursos = variables[nombre + '_recursos']
recursos = recursos.strip('{').strip('}').split(',')
for recurso in recursos:
if recurso:
med = Med.objects.get(slug=recurso)
etapa.recursos.add(med)
return etapa
def parse_bloque(self, bloque):
if bloque.isdigit():
return self.bloques[int(bloque) - 1]
if bloque not in self.bloques:
message = u'Bloque: %s no es un valor válido' % bloque
print message.encode('utf-8')
raise
return bloque
def default(self, model, _id, **filters):
"""
Busca el elemento de acuerdo a los filtros o asigna el valor
default especificado.
"""
try:
return model.objects.get(**filters)
except ObjectDoesNotExist:
message = u'%s: no se encuentra %s se asignará %s %d' % (
model.__class__.__name__,
filters,
model.__class__.__name__,
_id
)
self.stderr.write(message.encode('utf-8'))
return model.objects.get(id=_id)
def parse_week(self, value):
"""
Extrae el valor de la semana y determina la semana
"""
value = value.split('_')
return [
re.sub(r'\D', '', value[0]),
value[1][0] if len(value) > 1 else 'a'
]
def asignatura(self, grado, name):
"""
Ejecuta sustitución de nombre de asignatura y devuelve la asignatura.
"""
name = name.strip()
try:
return grado.ciclo.categorias.get(
val=name
)
except ObjectDoesNotExist:
message = u'Asignatura: %s no existe' % name
print message.encode('utf-8')
raise
def imprimirsalidaimportacion(self, action, item, model_type):
descripcion_impresion = None
if model_type == Med:
descripcion_impresion = item.slug
elif model_type == Enlace or model_type == Video:
descripcion_impresion = item.url
elif model_type == Archivo:
descripcion_impresion = item.archivo
print ((
'%s %s - %s' % (
action,
model_type._meta.verbose_name,
descripcion_impresion
)
).encode('utf-8'))
print "--------------------------------------------------------"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment