Examples¶
Sometimes the easiest way to learn is by seeing the library in action. Here are a collection of
fully working examples that demonstrate the flexibility and ease of use of the absio
library.
Note
These examples require the installation of the Click library.
User Manipulation¶
This tool allows you to create users and modify their credentials. Users can also be imported into new systems.:
#!/usr/bin/env python
"""A sample application involving users.
You may create and delete users. Additionally you may manage user's authentication and backup
credentials.
"""
import click
import absio
import logging
import sys
from functools import partial
APP_NAME = 'python-absio-user-cli'
error = partial(click.secho, fg='red')
warn = partial(click.secho, fg='yellow')
info = partial(click.secho, fg='white')
success = partial(click.secho, fg='green')
@click.group()
@click.option('--debug/--no-debug', default=False)
def cli(debug):
# If debug is enabled, redirect the absio logging to the console.
if debug:
logger = logging.getLogger('absio')
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
logger.addHandler(ch)
logger.setLevel(logging.DEBUG)
# A common set of options used for server specification.
server_options = (
click.option('--api-key', required=True),
click.option('--url', default='https://sandbox.absio.com'),
)
# Options used to log a specific account into the server.
login_options = (
click.option('--user-id', required=True),
click.option('--password', required=True),
click.option('--backup-phrase', required=True),
)
def apply_options(*options):
def g(f):
for option in reversed(options):
f = option(f)
return f
return g
@cli.command()
@apply_options(*server_options)
@click.option('--password', required=True)
@click.option('--reminder', required=True)
@click.option('--backup-phrase', required=True)
def create(api_key, url, password, reminder, backup_phrase):
"""Registers a new user."""
info('Creating user.')
absio.initialize(api_key, app_name=APP_NAME, server_url=url)
try:
user = absio.user.create(password=password, reminder=reminder, passphrase=backup_phrase)
except Exception as e:
error('Failed to create user: {e}'.format(e=e))
sys.exit(1)
success('User created: {user}'.format(user=user))
@cli.command()
@apply_options(*(server_options + login_options))
def delete(api_key, url, user_id, password, backup_phrase):
"""Permanently removes a user."""
info('Deleting user.')
absio.initialize(api_key, app_name=APP_NAME, server_url=url)
try:
absio.login(user_id, password=password, passphrase=backup_phrase)
absio.user.delete()
except Exception as e:
error('Failed to delete user: {e}'.format(e=e))
sys.exit(1)
success('Deleted user: {id}'.format(id=user_id))
def _login(api_key, url, user_id, password, backup_phrase):
info('Logging in.')
absio.initialize(api_key, app_name=APP_NAME, server_url=url)
try:
user = absio.login(user_id, password=password, passphrase=backup_phrase)
except Exception as e:
error('Failed to login user: {e}'.format(e=e))
sys.exit(1)
success('Successfully logged in user: {user}'.format(user=user))
@cli.command()
@apply_options(*(server_options + login_options))
def login(api_key, url, user_id, password, backup_phrase):
"""Verifies credentials by logging in."""
_login(api_key, url, user_id, password, backup_phrase)
@cli.command()
@apply_options(*server_options)
@click.option('--user-id', required=True)
def getreminder(api_key, url, user_id):
"""Returns the publicly accessible reminder for the user's backup passphrase."""
info('Fetching reminder information.')
absio.initialize(api_key, app_name=APP_NAME, server_url=url)
try:
reminder = absio.user.get_backup_reminder(user_id)
except Exception as e:
error('Failed to fetch reminder information: {e}'.format(e=e))
sys.exit(1)
success('Reminder information: {reminder}'.format(reminder=reminder))
@cli.command()
@apply_options(*(server_options + login_options))
@click.option('--new-password', required=True)
@click.option('--new-backup-phrase', required=True)
@click.option('--new-reminder', required=False)
def change_credentials(api_key, url, user_id, password, backup_phrase, new_password, new_backup_phrase, new_reminder):
"""Changes a user's backup phrase and reminder."""
info('Changing backup passphrase and reminder.')
_login(api_key, url, user_id, password, backup_phrase)
try:
absio.user.change_credentials(
password=new_password,
passphrase=new_backup_phrase,
reminder=new_reminder,
)
except Exception as e:
error('Failed to change backup credentials: {e}'.format(e=e))
sys.exit(1)
success('Successfully changed backup passphrase to {p}'.format(p=new_backup_phrase))
success('Successfully changed backup reminder to {r}'.format(r=new_reminder))
if __name__ == '__main__':
cli()
Content Sharing¶
Being able to securely and easily share content with other users is the main use case for using
the absio
library. This tool allows you select local files and recipients with whom they
should be shared:
#!/usr/bin/env python
"""A sample application involving containers.
You may create, update, and delete containers.
"""
import arrow
import absio
import click
import json
import os
import pprint
import sys
import tempfile
import uuid
from copy import deepcopy
from functools import partial, reduce
APP_NAME = 'python-absio-container-cli'
error = partial(click.secho, fg='red')
warn = partial(click.secho, fg='yellow')
info = partial(click.secho, fg='white')
success = partial(click.secho, fg='green')
container_attrs = [
'access',
'content',
'created_at',
'created_by',
'encrypted',
'header.data',
'id',
'modified_at',
'modified_by',
'type',
]
access_perms = [
'access.view',
'access.modify',
'container.download',
'container.decrypt',
'container.upload',
'container.type.view',
'container.type.modify',
]
def rsetattr(obj, attr, val):
pre, _, post = attr.rpartition('.')
return setattr(rgetattr(obj, pre) if pre else obj, post, val)
sentinel = object()
def rgetattr(obj, attr, default=sentinel):
if default is sentinel:
_getattr = getattr
else:
def _getattr(obj, name):
return getattr(obj, name, default)
return reduce(_getattr, [obj] + attr.split('.'))
def getchar():
"""A wrapper for click's getchar function.
This is due to: https://github.com/pallets/click/issues/822
"""
c = click.getchar().lower()
if isinstance(c, bytes):
enc = getattr(sys.stdin, 'encoding', 'cp1252')
c = c.decode(enc, 'replace')
return c
def _set_value(name, current):
click.clear()
warn('Set Container {}\n'.format(name.capitalize()))
info('Current: {}\n'.format(current))
value = click.prompt('New {}'.format(name.lower()))
return value
def _pprint_access(access, title='Access', menunum=None):
if menunum:
click.echo(click.style(' {}. '.format(menunum), fg='blue', bold=True) + click.style(title, fg='green'), nl=False)
else:
click.echo(click.style(title, fg='green'), nl=False)
if access is None:
info(' (None)')
elif isinstance(access, dict):
info('')
for user_id, val in access.items():
click.echo(click.style(' User ID', fg='green') + ': {}'.format(user_id))
click.echo(click.style(' Expiration', fg='green') + ': {}'.format(val.expiration))
click.echo(click.style(' Permissions', fg='green') + ': {}'.format(val.permissions))
elif isinstance(access, list):
info('')
for user_id in access:
click.echo(click.style(' User ID', fg='green') + ': {}'.format(user_id))
def menu(num, opt):
click.echo(click.style(' {}'.format(num), fg='blue', bold=True) + '. {}'.format(opt))
def _container_attr_menu(attrs):
info('\nItem:\n')
click.echo(click.style(' 1. ', fg='blue', bold=True) + click.style('Header', fg='green') + ' ({})'.format(attrs['header']))
content = attrs['content']
if content is not None:
content = (content[:30] + b'...') if len(content) > 33 else content
click.echo(click.style(' 2. ', fg='blue', bold=True) + click.style('Content', fg='green') + ' ({})'.format(content))
click.echo(click.style(' 3. ', fg='blue', bold=True) + click.style('Type', fg='green') + ' ({})'.format(attrs['type']))
_pprint_access(attrs['access'], title='Access', menunum=4)
def _set_container_content(existing):
# Returns a tuple of (content, filename)
while True:
click.clear()
warn('Set container content:\n')
if existing is not None:
data = (existing[:30] + b'...') if len(existing) > 33 else existing
else:
data = None
click.echo(click.style('Current Content', fg='green') + ': ({})\n'.format(data))
menu(1, 'Raw Input.')
menu(2, 'From File.')
menu('D', 'Done.\n')
c = getchar()
if c == 'd':
return (existing, None)
if c == '1':
value = click.prompt('Enter container data')
return (value.encode('utf-8'), None)
if c == '2':
fp = click.prompt('Enter file to load', type=click.File('rb'))
return (fp.read(), os.path.basename(fp.name))
def _set_container_access(existing):
while True:
click.clear()
warn('Set container access:\n')
_pprint_access(existing, title='Current Access')
menu('\n 1', 'Default access.')
menu(2, 'By user ID(s).')
menu(3, 'Advanced setting.')
menu('\n D', 'Done.')
c = getchar()
if c == '1':
return
elif c == '2':
while True:
value = click.prompt('Enter comma seperated User IDs')
# Verify that we have a list of UUIDs
try:
return [str(uuid.UUID(val.strip())) for val in value.split(',')]
except:
error('Value must be comma seperated UUID strings.')
elif c == '3':
return _set_advanced_container_access(existing)
elif c == 'd':
return
def _set_individual_access(user_id, existing=None):
if existing is None:
existing = dict(expiration=None, permissions=None)
else:
existing = dict(expiration=existing.expiration, permissions=existing.permissions)
while True:
click.clear()
warn('Setting individual user access for {}:\n'.format(user_id))
info(' 1. Expiration ({})'.format(existing['expiration']))
info(' 2. Permissions ({})'.format(existing['permissions']))
info(' D. Done')
c = getchar()
if c == '1':
while True:
value = click.prompt('Expiration', default='')
if value == '':
existing['expiration'] = None
break
try:
existing['expiration'] = arrow.get(value).datetime
except:
error('Unable to convert to time, try again.')
else:
break
elif c == '2':
existing['permissions'] = _set_permissions(user_id)
elif c == 'd':
break
# Now that the user is finished fine tuning the params, we can construct an access obj.
access = absio.access.Access(user_id=user_id, expiration=existing['expiration'], permissions=existing['permissions'])
return access
def _set_advanced_container_access(existing):
if isinstance(existing, dict):
access = existing
else:
# If existing access isn't a dict, that means it's either None (default) or a list. Neither
# of which matter when configuring advanced access.
access = dict()
while True:
click.clear()
warn('Setting advanced access:\n')
_pprint_access(existing, title='Current Access')
info('\n 1. Add User')
info(' 2. Edit User')
info(' 3. Remove User')
info(' D. Done\n')
c = getchar()
if c == '1':
user_id = str(click.prompt('Enter User ID', type=uuid.UUID))
access[user_id] = _set_individual_access(user_id)
elif c == '2':
user_id = None
while user_id not in access:
user_id = click.prompt('Enter User ID', type=uuid.UUID)
access[user_id] = _set_individual_access(user_id, access[user_id])
elif c == '3':
user_id = None
while user_id not in access:
user_id = click.prompt('Enter User ID', type=uuid.UUID)
access.pop(user_id)
elif c == 'd':
break
return access
def _set_permissions(user_id):
click.clear()
warn('Setting permissions for {}\n'.format(user_id))
p = absio.permissions.Permissions()
for perm in access_perms:
if not click.confirm(' ' + perm, default=True):
rsetattr(p, perm, False)
return p
def create():
# Where we'll store the currently selected info prior to creation.
attrs = {
'header': None,
'content': None,
'type': None,
'access': None,
}
while True:
click.clear()
warn('Create a container.')
_container_attr_menu(attrs)
menu('\n C', 'Create it')
menu('M', 'Main menu\n')
c = getchar()
if c == 'm':
break
elif c == '1':
attrs['header'] = _set_value('header', attrs['header'])
elif c == '2':
attrs['content'], filename = _set_container_content(attrs['content'])
if filename:
attrs['type'] = 'File'
attrs['header'] = filename
elif c == '3':
attrs['type'] = _set_value('Type', attrs['type'])
elif c == '4':
attrs['access'] = _set_container_access(attrs['access'])
elif c == 'c':
# Allow string input of the JSON header ...
try:
header = json.loads(attrs['header'])
except:
header = attrs['header']
kwargs = {
'header': header,
'content': attrs['content'],
'type': attrs['type'],
'access': attrs['access'],
}
container = absio.container.create(**kwargs)
success('\nCreated ' + str(container) + '\n')
click.pause()
break
def read():
click.clear()
warn('Read a container.\n')
container_id = click.prompt('Container ID', type=uuid.UUID)
try:
container = absio.container.get(container_id)
except Exception as e:
error('Unable to get container: {}'.format(e))
if click.confirm('Would you like to try again?'):
read()
main_menu()
success('\n' + str(container) + '\n')
is_file = container.type == 'File'
for attr in container_attrs:
if attr == 'access':
click.echo(click.style('{:>12}'.format(attr), fg='green') + ':')
# click.echo('{:>16}'.format(pprint.pformat(getattr(container, attr))))
for recip, access in getattr(container, attr).items():
click.echo('{}: {}'.format(recip, access))
click.echo(click.style(' Created By', fg='magenta') + ': {}'.format(pprint.pformat(access.created_by)))
click.echo(click.style(' Created At', fg='magenta') + ': {}'.format(access.created_at))
click.echo(click.style(' Modified By', fg='magenta') + ': {}'.format(pprint.pformat(access.modified_by)))
click.echo(click.style(' Modified At', fg='magenta') + ': {}'.format(access.modified_at))
elif attr == 'content':
content = container.content.data
if content is not None and len(content) > 33 and not is_file:
content = content[:30] + b'...'
if is_file:
# Write it out to a tmp file and display the file system location
filename = os.path.join(tempfile.gettempdir(), container.header.data)
with open(filename, 'wb') as f:
f.write(content)
content = f.name
click.echo(click.style('{:>12}'.format(attr), fg='green') + ': ' + str(content))
else:
click.echo(click.style('{:>12}'.format(attr), fg='green') + ': ' + str(rgetattr(container, attr)))
info('')
click.pause()
if is_file:
os.unlink(filename)
def update():
click.clear()
warn('Update a container.\n')
container_id = click.prompt('Container ID', type=uuid.UUID)
try:
container = absio.container.get(container_id)
except Exception as e:
error('Unable to get container: {}'.format(e))
if click.confirm('Would you like to try again?'):
update()
main_menu()
attrs = {
'header': container.header.data,
'content': container.content.data,
'type': container.type,
'access': deepcopy(container.access),
}
while True:
click.clear()
warn('Updating container {}'.format(container.id))
_container_attr_menu(attrs)
info('\n U. Update it')
info(' M. Main menu\n')
c = getchar()
if c == 'm':
break
if c == '1':
attrs['header'] = _set_value('header', attrs['header'])
elif c == '2':
attrs['content'] = _set_container_content(attrs['content'])
elif c == '3':
attrs['type'] = _set_value('type', attrs['type'])
elif c == '4':
attrs['access'] = _set_container_access(attrs['access'])
elif c == 'u':
try:
header = json.loads(attrs['header'])
except:
header = attrs['header']
kwargs = dict()
if header != container.header.data:
kwargs['header'] = header
if attrs['content'] != container.content.data:
kwargs['content'] = attrs['content']
if attrs['type'] != container.type:
kwargs['type'] = attrs['type']
if attrs['access'] != container.access:
kwargs['access'] = attrs['access']
absio.container.update(container.id, **kwargs)
success('Container {} has been updated.'.format(container.id))
click.pause()
break
def delete():
click.clear()
warn('Delete a container.\n')
container_id = click.prompt('Container ID', type=uuid.UUID)
if click.confirm('\nAre you sure you would like to delete container {}?'.format(container_id)):
try:
absio.container.delete(container_id)
except Exception as e:
error('Unable to delete container: {}'.format(e))
if click.confirm('\nWould you like to try again?'):
delete()
main_menu()
else:
main_menu()
success('\nContainer {} has been deleted.\n'.format(container_id))
click.pause()
def events():
click.clear()
warn('All container events.\n')
events = absio.container.get_events()
click.echo_via_pager('\n'.join((str(e) for e in events)))
def exit():
click.clear()
sys.exit()
def main_menu():
click.clear()
warn('Welcome to the Absio Container CRUD Sample App.\n')
info('Choose your option:\n')
menu('C', 'Create')
menu('R', 'Read')
menu('U', 'Update')
menu('D', 'Delete\n')
menu('L', 'List Events')
menu('Q', 'Quit')
cmd = getchar()
dispatch = {
'c': create,
'r': read,
'u': update,
'd': delete,
'l': events,
'q': exit,
}
dispatch.get(cmd, main_menu)()
@click.command()
@click.option('--api-key', required=True)
@click.option('--url', default='https://sandbox.absio.com')
@click.option('--user-id', required=True)
@click.option('--password', required=True)
@click.option('--backup-phrase', required=True)
def main(api_key, url, user_id, password, backup_phrase):
absio.initialize(api_key=api_key, server_url=url, app_name=APP_NAME)
# Try logging in to the OFS before falling back to the server.
try:
absio.login(user_id, password)
except:
absio.login(user_id, password, backup_phrase)
while True:
try:
main_menu()
except Exception as e:
error('Unexpected error: {}\n'.format(e))
click.pause()
if __name__ == '__main__':
main()
Custom Provider¶
Curious how to create your own provider? Here’s an example that illustrates using the Absio Broker™ application for everything except the encrypted secured container content - the management of the actual encrypted content is now the responsibility of the library user.
Imagine that you have requirements where you cannot store your content in a cloud provider, or perhaps only within a specific cloud provider, or on a SAN, etc. This provider shows how you can quickly override the default behavior and you become responsible for managing content, while still fully leveraging the Absio Broker™ application for its user and key management system:
"""An example provider demonstrating managing yourself the storage of encrypted content.
The Absio Broker™ application and OFS are utilized to store users, keys, events, etc, but
the actual encrypted content is not stored in other of those locations. Instead, this provider
allows you to handle the data storage yourself.
"""
from types import SimpleNamespace
from absio.providers import server_cache_ofs
class ContainerProvider(object):
def get(self, container_id, data, *args, **kwargs):
# This will return a container that's as populated as it can be.
container = server_cache_ofs.container.get(container_id)
assert container.encrypted
container.data = data
if container.data and container.container_keys:
container.decrypt()
return container
def update_or_create(self, container, **kwargs):
assert container.encrypted
# Save the container data so that it is not stored on the server / ofs
data = container.data
container.data = None
# Now that we've stored the data, we can store it in the provider.
container = server_cache_ofs.container.update_or_create(container, **kwargs)
# Finally, restore the data so it's transparent to the caller.
container.data = data
return container
def delete(self, container_id):
# This will delete the container keys & access info from the server and ofs.
server_cache_ofs.container.delete(container_id)
class SelfManagedContentProvider(object):
def __init__(self):
self.session = SimpleNamespace(user=None)
# We use the default behavior of the default provider...
self.keys = server_cache_ofs.keys
self.key_file = server_cache_ofs.key_file
self.events = server_cache_ofs.events
self.users = server_cache_ofs.users
# ...for everything except for containers, at which point our custom behavior takes over.
self.containers = ContainerProvider()
def initialize(self, *args, **kwargs):
server_cache_ofs.initialize(*args, **kwargs)
def login(self, *args, **kwargs):
user = server_cache_ofs.login(*args, **kwargs)
self.session.user = user
return user
def logout(self, *args, **kwargs):
server_cache_ofs.logout(*args, **kwargs)
self.session.user = None
self_managed = SelfManagedContentProvider()
So how would you go about using this provider? Like this:
with absio.providers.provider(self_managed):
user = absio.login(user_id, password)
# This container is created on the server and in the OFS, but the content is not stored
# in either location.
container = absio.container.create(content)
# Now you, the app developer, decide where to store the data. Here's an example:
open('/tmp/{}'.format(container.id), 'wb').write(container.data)
# Now to fetch the container back and merge it with your encrypted data:
data = open('/tmp/{}'.format(container.id), 'rb').read()
container = absio.container.get(container.id, data=data)
Note that the storage and retrieval of your self-managed data could just as easily have been
accomplished within the implementation of the get()
and update_or_create()
methods.
This would reduce the sample usage to just the create()
and get()
.