Skip to content

Instantly share code, notes, and snippets.

@pyKun
Last active December 17, 2015 18:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pyKun/5651880 to your computer and use it in GitHub Desktop.
Save pyKun/5651880 to your computer and use it in GitHub Desktop.
test swift3 middleware
#!/usr/bin/env python
#-*- coding:utf-8 -*-
# Author: Kun Huang <academicgareth@gmail.com>
# Created Time: 05/24/13 11:47:24 (CST)
# Modified Time: 05/27/13 16:15:33 (CST)
import boto
from boto.s3.connection import S3Connection
conn= S3Connection(
aws_access_key_id='system:root',
aws_secret_access_key='testpass',
port=8080,
host='127.0.0.1',
is_secure=False,
calling_format=boto.s3.connection.OrdinaryCallingFormat())
#print conn.get_all_buckets()
if 'test_cors':
'''
1. swift doesn't support multi-config
2. swift cors doesn't support allowed-method
'''
bk = conn.get_bucket('books')
# set cors
zk = boto.s3.cors.CORSConfiguration()
allowed_method = ['POST','PUT', 'HEAD', 'DELETE', 'GET']
allowed_origin = ['http://localhost', 'http://192.168.3.76']
ruleid = 'unique_rule'
allowed_header = ['header_1', 'header_2']
max_age = 5000
expose_header = ['header_3']
zk.add_rule(allowed_method,
allowed_origin,
ruleid,
allowed_header,
max_age,
expose_header)
print '##########','setting cors'
bk.set_cors(zk)
print '##########','getting a cors config'
print bk.get_cors()
print '##########','deleting cors config'
bk.delete_cors()
print '##########','getting a empty cors config'
print bk.get_cors()
#!/usr/bin/env python
#-*- coding:utf-8 -*-
# Author: Kun Huang <academicgareth@gmail.com>
# Created Time: 05/24/13 11:47:24 (CST)
# Modified Time: 05/27/13 17:45:07 (CST)
import boto
from boto.s3.connection import S3Connection
conn= S3Connection(
aws_access_key_id='system:root',
aws_secret_access_key='testpass',
port=8080,
host='127.0.0.1',
is_secure=False,
calling_format=boto.s3.connection.OrdinaryCallingFormat())
#print conn.get_all_buckets()
if 'test_lifecycle':
'''
1. swift doesn't have transation
2. swift doesn't have defination of storage class
so, we just implement expiration on container/bucket.
'''
lc = conn.create_bucket('livevc')
config = boto.s3.lifecycle.Lifecycle()
expir = boto.s3.lifecycle.Expiration(days='5')
if None:
import datetime
now = datetime.datetime.now()
later = now + datetime.timedelta(0,60,0) # 1 min later
expir = boto.s3.lifecycle.Expiration(date=later.isoformat())
config.add_rule(id='unique_lifecycle_rule',
prefix='expir',
status='Enabled',
expiration=expir)
lc.configure_lifecycle(config)
print lc.get_lifecycle_config()
lc.delete_lifecycle_configuration()
print lc.get_lifecycle_config()
lc = conn.delete_bucket('livevc')
#!/usr/bin/env python
#-*- coding:utf-8 -*-
# Author: Kun Huang <academicgareth@gmail.com>
import boto
from boto.s3.connection import S3Connection
conn= S3Connection(
aws_access_key_id='system:root',
aws_secret_access_key='testpass',
port=8080,
host='127.0.0.1',
is_secure=False,
calling_format=boto.s3.connection.OrdinaryCallingFormat())
#print conn.get_all_buckets()
if 'object_basic':
'''
1. test create a versioned bucket
1.1 create bucket
1.2 set a obj for it
1.3 version it
1.4 set 2nd obj for it
1.5 set 2nd obj again to generate copy in bucket2
2. test delete a versioned object
2.1 delete a obj in bucket1 which has history/virson
2.2 test old contents of that obj
3. test post
3.1 post a versioned obj
999 delete all test data
'''
def clear():
for bucket in conn.get_all_buckets():
for key in bucket.list():
bucket.delete_key(key.name)
conn.delete_bucket(bucket.name)
clear()
conn.create_bucket('naruto')
mr = conn.get_bucket('naruto')
gift = mr.new_key('xiaoying\'gift')
gift.set_contents_from_string('this is a gift from xiaoying')
mr.configure_versioning(True)
# test add
kuwu = mr.new_key('kuwu')
kuwu.set_contents_from_string('a naruto\'s tool which could be copied')
kuwu.set_contents_from_string('Oh only PUTing again make copy in swift')
print '*****List Origin Bucket*****'
print mr.get_all_keys()
print '*****List Versions Location*****'
_mr = conn.get_bucket('_naruto')
print _mr.get_all_keys()
# test delete
mr.delete_key('kuwu')
print mr.get_key('kuwu').get_contents_as_string()
# test post
# nothing here, because s3 treat POST as PUT in html, nothing else
clear()
#!/usr/bin/env python
#-*- coding:utf-8 -*-
# Author: Kun Huang <academicgareth@gmail.com>
import boto
from boto.s3.connection import S3Connection
conn= S3Connection(
aws_access_key_id='system:root',
aws_secret_access_key='testpass',
port=8080,
host='127.0.0.1',
is_secure=False,
calling_format=boto.s3.connection.OrdinaryCallingFormat())
#print conn.get_all_buckets()
if 'object_basic':
'''
1. test PUT,GET,DELETE,POST,HEAD
'''
team = conn.create_bucket('lakers')
# build new obj
kb = team.new_key('kobe')
kb.set_contents_from_string('***Hello, Im Kobe Bryant***')
_kb = team.get_key('kobe')
print _kb.get_contents_as_string()
_kb.set_contents_from_string('***And Im the best player in NBA!***')
print _kb.get_contents_as_string()
team.delete_key('kobe')
conn.delete_bucket('lakers')
#!/usr/bin/env python
#-*- coding:utf-8 -*-
# Author: Kun Huang <academicgareth@gmail.com>
# Created Time: 05/24/13 11:47:24 (CST)
# Modified Time: 05/27/13 20:27:25 (CST)
import boto
from boto.s3.connection import S3Connection
conn= S3Connection(
aws_access_key_id='system:root',
aws_secret_access_key='testpass',
port=8080,
host='127.0.0.1',
is_secure=False,
calling_format=boto.s3.connection.OrdinaryCallingFormat())
#print conn.get_all_buckets()
if 'test_meta':
'''
1. implement tag as meta in swift
'''
tagset1 = boto.s3.tagging.TagSet()
tagset2 = boto.s3.tagging.TagSet()
tagset1.add_tag('sports','basketball')
tagset2.add_tag('movies','cangjingkong')
tagset2.add_tag('music','linzhixuan')
tagset2.add_tag('cellphone','iphone')
tags1 = boto.s3.tagging.Tags()
tags2 = boto.s3.tagging.Tags()
tags1.add_tag_set(tagset1)
tags2.add_tag_set(tagset2)
mm = conn.create_bucket('metaman')
mm.set_tags(tags1)
print mm.get_tags()
mm.delete_tags()
print mm.get_tags()
mm.set_tags(tags2)
print mm.get_tags()
mm.delete_tags()
print mm.get_tags()
conn.delete_bucket('metaman')
#!/usr/bin/env python
#-*- coding:utf-8 -*-
# Author: Kun Huang <academicgareth@gmail.com>
# Created Time: 05/24/13 11:47:24 (CST)
# Modified Time: 05/27/13 19:27:42 (CST)
import boto
from boto.s3.connection import S3Connection
conn= S3Connection(
aws_access_key_id='system:root',
aws_secret_access_key='testpass',
port=8080,
host='127.0.0.1',
is_secure=False,
calling_format=boto.s3.connection.OrdinaryCallingFormat())
#print conn.get_all_buckets()
if 'test_versioning':
'''
1. swift doesn't support mfa delete
its implement cost more time, so I will not do it in this version
'''
conn.delete_bucket('yinxiaowei')
yxv = conn.create_bucket('yinxiaowei')
yxv.configure_versioning(True)
print yxv.get_versioning_status()
yxv.configure_versioning(False)
print yxv.get_versioning_status()
conn.delete_bucket('yinxiaowei')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment