Examples

Sometimes the easiest way to learn is by seeing the library in action. Here are a collection of fully working examples that demonstrate the flexibility and ease of use of the absio library.

Note

These examples require the installation of the Click library.

User Manipulation

This tool allows you to create users and modify their credentials. Users can also be imported into new systems.:

#!/usr/bin/env python
"""A sample application involving users.

You may create and delete users.  Additionally you may manage user's authentication and backup
credentials.
"""
import click
import absio
import logging
import sys
from functools import partial

APP_NAME = 'python-absio-user-cli'

error = partial(click.secho, fg='red')
warn = partial(click.secho, fg='yellow')
info = partial(click.secho, fg='white')
success = partial(click.secho, fg='green')


@click.group()
@click.option('--debug/--no-debug', default=False)
def cli(debug):
    # If debug is enabled, redirect the absio logging to the console.
    if debug:
        logger = logging.getLogger('absio')
        ch = logging.StreamHandler()
        ch.setLevel(logging.DEBUG)
        logger.addHandler(ch)
        logger.setLevel(logging.DEBUG)


# A common set of options used for server specification.
server_options = (
    click.option('--api-key', required=True),
    click.option('--url', default='https://sandbox.absio.com'),
)

# Options used to log a specific account into the server.
login_options = (
    click.option('--user-id', required=True),
    click.option('--password', required=True),
    click.option('--backup-phrase', required=True),
)


def apply_options(*options):
    def g(f):
        for option in reversed(options):
            f = option(f)

        return f

    return g


@cli.command()
@apply_options(*server_options)
@click.option('--password', required=True)
@click.option('--reminder', required=True)
@click.option('--backup-phrase', required=True)
def create(api_key, url, password, reminder, backup_phrase):
    """Registers a new user."""
    info('Creating user.')
    absio.initialize(api_key, app_name=APP_NAME, server_url=url)
    try:
        user = absio.user.create(password=password, reminder=reminder, passphrase=backup_phrase)
    except Exception as e:
        error('Failed to create user: {e}'.format(e=e))
        sys.exit(1)
    success('User created: {user}'.format(user=user))


@cli.command()
@apply_options(*(server_options + login_options))
def delete(api_key, url, user_id, password, backup_phrase):
    """Permanently removes a user."""
    info('Deleting user.')
    absio.initialize(api_key, app_name=APP_NAME, server_url=url)
    try:
        absio.login(user_id, password=password, passphrase=backup_phrase)
        absio.user.delete()
    except Exception as e:
        error('Failed to delete user: {e}'.format(e=e))
        sys.exit(1)
    success('Deleted user: {id}'.format(id=user_id))


def _login(api_key, url, user_id, password, backup_phrase):
    info('Logging in.')
    absio.initialize(api_key, app_name=APP_NAME, server_url=url)
    try:
        user = absio.login(user_id, password=password, passphrase=backup_phrase)
    except Exception as e:
        error('Failed to login user: {e}'.format(e=e))
        sys.exit(1)
    success('Successfully logged in user: {user}'.format(user=user))


@cli.command()
@apply_options(*(server_options + login_options))
def login(api_key, url, user_id, password, backup_phrase):
    """Verifies credentials by logging in."""
    _login(api_key, url, user_id, password, backup_phrase)


@cli.command()
@apply_options(*server_options)
@click.option('--user-id', required=True)
def getreminder(api_key, url, user_id):
    """Returns the publicly accessible reminder for the user's backup passphrase."""
    info('Fetching reminder information.')
    absio.initialize(api_key, app_name=APP_NAME, server_url=url)
    try:
        reminder = absio.user.get_backup_reminder(user_id)
    except Exception as e:
        error('Failed to fetch reminder information: {e}'.format(e=e))
        sys.exit(1)
    success('Reminder information: {reminder}'.format(reminder=reminder))


@cli.command()
@apply_options(*(server_options + login_options))
@click.option('--new-password', required=True)
@click.option('--new-backup-phrase', required=True)
@click.option('--new-reminder', required=False)
def change_credentials(api_key, url, user_id, password, backup_phrase, new_password, new_backup_phrase, new_reminder):
    """Changes a user's backup phrase and reminder."""
    info('Changing backup passphrase and reminder.')
    _login(api_key, url, user_id, password, backup_phrase)
    try:
        absio.user.change_credentials(
            password=new_password,
            passphrase=new_backup_phrase,
            reminder=new_reminder,
        )
    except Exception as e:
        error('Failed to change backup credentials: {e}'.format(e=e))
        sys.exit(1)
    success('Successfully changed backup passphrase to {p}'.format(p=new_backup_phrase))
    success('Successfully changed backup reminder to {r}'.format(r=new_reminder))


if __name__ == '__main__':
    cli()

Content Sharing

Being able to securely and easily share content with other users is the main use case for using the absio library. This tool allows you select local files and recipients with whom they should be shared:

#!/usr/bin/env python
"""A sample application involving containers.

You may create, update, and delete containers.
"""
import arrow
import absio
import click
import json
import os
import pprint
import sys
import tempfile
import uuid
from copy import deepcopy
from functools import partial, reduce

APP_NAME = 'python-absio-container-cli'

error = partial(click.secho, fg='red')
warn = partial(click.secho, fg='yellow')
info = partial(click.secho, fg='white')
success = partial(click.secho, fg='green')

container_attrs = [
    'access',
    'content',
    'created_at',
    'created_by',
    'encrypted',
    'header.data',
    'id',
    'modified_at',
    'modified_by',
    'type',
]


access_perms = [
    'access.view',
    'access.modify',
    'container.download',
    'container.decrypt',
    'container.upload',
    'container.type.view',
    'container.type.modify',
]


def rsetattr(obj, attr, val):
    pre, _, post = attr.rpartition('.')
    return setattr(rgetattr(obj, pre) if pre else obj, post, val)


sentinel = object()


def rgetattr(obj, attr, default=sentinel):
    if default is sentinel:
        _getattr = getattr
    else:
        def _getattr(obj, name):
            return getattr(obj, name, default)
    return reduce(_getattr, [obj] + attr.split('.'))


def getchar():
    """A wrapper for click's getchar function.

    This is due to: https://github.com/pallets/click/issues/822
    """
    c = click.getchar().lower()
    if isinstance(c, bytes):
        enc = getattr(sys.stdin, 'encoding', 'cp1252')
        c = c.decode(enc, 'replace')
    return c


def _set_value(name, current):
    click.clear()
    warn('Set Container {}\n'.format(name.capitalize()))
    info('Current: {}\n'.format(current))
    value = click.prompt('New {}'.format(name.lower()))
    return value


def _pprint_access(access, title='Access', menunum=None):
    if menunum:
        click.echo(click.style('  {}.  '.format(menunum), fg='blue', bold=True) + click.style(title, fg='green'), nl=False)
    else:
        click.echo(click.style(title, fg='green'), nl=False)

    if access is None:
        info(' (None)')

    elif isinstance(access, dict):
        info('')
        for user_id, val in access.items():
            click.echo(click.style('       User ID', fg='green') + ': {}'.format(user_id))
            click.echo(click.style('          Expiration', fg='green') + ': {}'.format(val.expiration))
            click.echo(click.style('          Permissions', fg='green') + ': {}'.format(val.permissions))

    elif isinstance(access, list):
        info('')
        for user_id in access:
            click.echo(click.style('       User ID', fg='green') + ': {}'.format(user_id))


def menu(num, opt):
    click.echo(click.style('  {}'.format(num), fg='blue', bold=True) + '.  {}'.format(opt))


def _container_attr_menu(attrs):
    info('\nItem:\n')
    click.echo(click.style('  1.  ', fg='blue', bold=True) + click.style('Header', fg='green') + ' ({})'.format(attrs['header']))
    content = attrs['content']
    if content is not None:
        content = (content[:30] + b'...') if len(content) > 33 else content
    click.echo(click.style('  2.  ', fg='blue', bold=True) + click.style('Content', fg='green') + ' ({})'.format(content))
    click.echo(click.style('  3.  ', fg='blue', bold=True) + click.style('Type', fg='green') + ' ({})'.format(attrs['type']))
    _pprint_access(attrs['access'], title='Access', menunum=4)


def _set_container_content(existing):
    # Returns a tuple of (content, filename)
    while True:
        click.clear()
        warn('Set container content:\n')
        if existing is not None:
            data = (existing[:30] + b'...') if len(existing) > 33 else existing
        else:
            data = None
        click.echo(click.style('Current Content', fg='green') + ': ({})\n'.format(data))
        menu(1, 'Raw Input.')
        menu(2, 'From File.')
        menu('D', 'Done.\n')
        c = getchar()
        if c == 'd':
            return (existing, None)
        if c == '1':
            value = click.prompt('Enter container data')
            return (value.encode('utf-8'), None)
        if c == '2':
            fp = click.prompt('Enter file to load', type=click.File('rb'))
            return (fp.read(), os.path.basename(fp.name))


def _set_container_access(existing):
    while True:
        click.clear()
        warn('Set container access:\n')
        _pprint_access(existing, title='Current Access')
        menu('\n  1', 'Default access.')
        menu(2, 'By user ID(s).')
        menu(3, 'Advanced setting.')
        menu('\n  D', 'Done.')
        c = getchar()
        if c == '1':
            return
        elif c == '2':
            while True:
                value = click.prompt('Enter comma seperated User IDs')
                # Verify that we have a list of UUIDs
                try:
                    return [str(uuid.UUID(val.strip())) for val in value.split(',')]
                except:
                    error('Value must be comma seperated UUID strings.')
        elif c == '3':
            return _set_advanced_container_access(existing)
        elif c == 'd':
            return


def _set_individual_access(user_id, existing=None):
    if existing is None:
        existing = dict(expiration=None, permissions=None)
    else:
        existing = dict(expiration=existing.expiration, permissions=existing.permissions)

    while True:
        click.clear()
        warn('Setting individual user access for {}:\n'.format(user_id))
        info('  1. Expiration ({})'.format(existing['expiration']))
        info('  2. Permissions ({})'.format(existing['permissions']))
        info('  D. Done')
        c = getchar()
        if c == '1':
            while True:
                value = click.prompt('Expiration', default='')
                if value == '':
                    existing['expiration'] = None
                    break
                try:
                    existing['expiration'] = arrow.get(value).datetime
                except:
                    error('Unable to convert to time, try again.')
                else:
                    break
        elif c == '2':
            existing['permissions'] = _set_permissions(user_id)
        elif c == 'd':
            break
    # Now that the user is finished fine tuning the params, we can construct an access obj.
    access = absio.access.Access(user_id=user_id, expiration=existing['expiration'], permissions=existing['permissions'])
    return access


def _set_advanced_container_access(existing):
    if isinstance(existing, dict):
        access = existing
    else:
        # If existing access isn't a dict, that means it's either None (default) or a list.  Neither
        # of which matter when configuring advanced access.
        access = dict()

    while True:
        click.clear()
        warn('Setting advanced access:\n')
        _pprint_access(existing, title='Current Access')
        info('\n  1. Add User')
        info('  2. Edit User')
        info('  3. Remove User')
        info('  D. Done\n')
        c = getchar()
        if c == '1':
            user_id = str(click.prompt('Enter User ID', type=uuid.UUID))
            access[user_id] = _set_individual_access(user_id)
        elif c == '2':
            user_id = None
            while user_id not in access:
                user_id = click.prompt('Enter User ID', type=uuid.UUID)
            access[user_id] = _set_individual_access(user_id, access[user_id])
        elif c == '3':
            user_id = None
            while user_id not in access:
                user_id = click.prompt('Enter User ID', type=uuid.UUID)
            access.pop(user_id)
        elif c == 'd':
            break
    return access


def _set_permissions(user_id):
    click.clear()
    warn('Setting permissions for {}\n'.format(user_id))
    p = absio.permissions.Permissions()
    for perm in access_perms:
        if not click.confirm('  ' + perm, default=True):
            rsetattr(p, perm, False)
    return p


def create():
    # Where we'll store the currently selected info prior to creation.
    attrs = {
        'header': None,
        'content': None,
        'type': None,
        'access': None,
    }

    while True:
        click.clear()
        warn('Create a container.')
        _container_attr_menu(attrs)
        menu('\n  C', 'Create it')
        menu('M', 'Main menu\n')
        c = getchar()
        if c == 'm':
            break
        elif c == '1':
            attrs['header'] = _set_value('header', attrs['header'])
        elif c == '2':
            attrs['content'], filename = _set_container_content(attrs['content'])
            if filename:
                attrs['type'] = 'File'
                attrs['header'] = filename
        elif c == '3':
            attrs['type'] = _set_value('Type', attrs['type'])
        elif c == '4':
            attrs['access'] = _set_container_access(attrs['access'])

        elif c == 'c':
            # Allow string input of the JSON header ...
            try:
                header = json.loads(attrs['header'])
            except:
                header = attrs['header']

            kwargs = {
                'header': header,
                'content': attrs['content'],
                'type': attrs['type'],
                'access': attrs['access'],
            }
            container = absio.container.create(**kwargs)
            success('\nCreated ' + str(container) + '\n')
            click.pause()
            break


def read():
    click.clear()
    warn('Read a container.\n')
    container_id = click.prompt('Container ID', type=uuid.UUID)
    try:
        container = absio.container.get(container_id)
    except Exception as e:
        error('Unable to get container: {}'.format(e))
        if click.confirm('Would you like to try again?'):
            read()
        main_menu()
    success('\n' + str(container) + '\n')
    is_file = container.type == 'File'
    for attr in container_attrs:
        if attr == 'access':
            click.echo(click.style('{:>12}'.format(attr), fg='green') + ':')
            # click.echo('{:>16}'.format(pprint.pformat(getattr(container, attr))))
            for recip, access in getattr(container, attr).items():
                click.echo('{}: {}'.format(recip, access))
                click.echo(click.style('                          Created By', fg='magenta') + ': {}'.format(pprint.pformat(access.created_by)))
                click.echo(click.style('                          Created At', fg='magenta') + ': {}'.format(access.created_at))
                click.echo(click.style('                         Modified By', fg='magenta') + ': {}'.format(pprint.pformat(access.modified_by)))
                click.echo(click.style('                         Modified At', fg='magenta') + ': {}'.format(access.modified_at))
        elif attr == 'content':
            content = container.content.data
            if content is not None and len(content) > 33 and not is_file:
                content = content[:30] + b'...'
            if is_file:
                # Write it out to a tmp file and display the file system location
                filename = os.path.join(tempfile.gettempdir(), container.header.data)
                with open(filename, 'wb') as f:
                    f.write(content)
                content = f.name
            click.echo(click.style('{:>12}'.format(attr), fg='green') + ': ' + str(content))
        else:
            click.echo(click.style('{:>12}'.format(attr), fg='green') + ': ' + str(rgetattr(container, attr)))
    info('')
    click.pause()
    if is_file:
        os.unlink(filename)


def update():
    click.clear()
    warn('Update a container.\n')
    container_id = click.prompt('Container ID', type=uuid.UUID)
    try:
        container = absio.container.get(container_id)
    except Exception as e:
        error('Unable to get container: {}'.format(e))
        if click.confirm('Would you like to try again?'):
            update()
        main_menu()

    attrs = {
        'header': container.header.data,
        'content': container.content.data,
        'type': container.type,
        'access': deepcopy(container.access),
    }
    while True:
        click.clear()
        warn('Updating container {}'.format(container.id))
        _container_attr_menu(attrs)
        info('\n  U. Update it')
        info('  M. Main menu\n')
        c = getchar()
        if c == 'm':
            break
        if c == '1':
            attrs['header'] = _set_value('header', attrs['header'])
        elif c == '2':
            attrs['content'] = _set_container_content(attrs['content'])
        elif c == '3':
            attrs['type'] = _set_value('type', attrs['type'])
        elif c == '4':
            attrs['access'] = _set_container_access(attrs['access'])

        elif c == 'u':
            try:
                header = json.loads(attrs['header'])
            except:
                header = attrs['header']

            kwargs = dict()
            if header != container.header.data:
                kwargs['header'] = header
            if attrs['content'] != container.content.data:
                kwargs['content'] = attrs['content']
            if attrs['type'] != container.type:
                kwargs['type'] = attrs['type']
            if attrs['access'] != container.access:
                kwargs['access'] = attrs['access']
            absio.container.update(container.id, **kwargs)
            success('Container {} has been updated.'.format(container.id))
            click.pause()
            break


def delete():
    click.clear()
    warn('Delete a container.\n')
    container_id = click.prompt('Container ID', type=uuid.UUID)
    if click.confirm('\nAre you sure you would like to delete container {}?'.format(container_id)):
        try:
            absio.container.delete(container_id)
        except Exception as e:
            error('Unable to delete container: {}'.format(e))
            if click.confirm('\nWould you like to try again?'):
                delete()
            main_menu()
    else:
        main_menu()
    success('\nContainer {} has been deleted.\n'.format(container_id))
    click.pause()


def events():
    click.clear()
    warn('All container events.\n')
    events = absio.container.get_events()
    click.echo_via_pager('\n'.join((str(e) for e in events)))


def exit():
    click.clear()
    sys.exit()


def main_menu():
    click.clear()
    warn('Welcome to the Absio Container CRUD Sample App.\n')
    info('Choose your option:\n')
    menu('C', 'Create')
    menu('R', 'Read')
    menu('U', 'Update')
    menu('D', 'Delete\n')
    menu('L', 'List Events')
    menu('Q', 'Quit')
    cmd = getchar()
    dispatch = {
        'c': create,
        'r': read,
        'u': update,
        'd': delete,
        'l': events,
        'q': exit,
    }
    dispatch.get(cmd, main_menu)()


@click.command()
@click.option('--api-key', required=True)
@click.option('--url', default='https://sandbox.absio.com')
@click.option('--user-id', required=True)
@click.option('--password', required=True)
@click.option('--backup-phrase', required=True)
def main(api_key, url, user_id, password, backup_phrase):

    absio.initialize(api_key=api_key, server_url=url, app_name=APP_NAME)
    # Try logging in to the OFS before falling back to the server.
    try:
        absio.login(user_id, password)
    except:
        absio.login(user_id, password, backup_phrase)

    while True:
        try:
            main_menu()
        except Exception as e:
            error('Unexpected error: {}\n'.format(e))
            click.pause()


if __name__ == '__main__':
    main()

Custom Provider

Curious how to create your own provider? Here’s an example that illustrates using the Absio Broker™ application for everything except the encrypted secured container content - the management of the actual encrypted content is now the responsibility of the library user.

Imagine that you have requirements where you cannot store your content in a cloud provider, or perhaps only within a specific cloud provider, or on a SAN, etc. This provider shows how you can quickly override the default behavior and you become responsible for managing content, while still fully leveraging the Absio Broker™ application for its user and key management system:

"""An example provider demonstrating managing yourself the storage of encrypted content.

The Absio Broker™ application and OFS are utilized to store users, keys, events, etc, but
the actual encrypted content is not stored in other of those locations.  Instead, this provider
allows you to handle the data storage yourself.
"""
from types import SimpleNamespace
from absio.providers import server_cache_ofs


class ContainerProvider(object):

    def get(self, container_id, data, *args, **kwargs):
        # This will return a container that's as populated as it can be.
        container = server_cache_ofs.container.get(container_id)
        assert container.encrypted

        container.data = data

        if container.data and container.container_keys:
            container.decrypt()

        return container

    def update_or_create(self, container, **kwargs):
        assert container.encrypted
        # Save the container data so that it is not stored on the server / ofs
        data = container.data
        container.data = None

        # Now that we've stored the data, we can store it in the provider.
        container = server_cache_ofs.container.update_or_create(container, **kwargs)

        # Finally, restore the data so it's transparent to the caller.
        container.data = data
        return container

    def delete(self, container_id):
        # This will delete the container keys & access info from the server and ofs.
        server_cache_ofs.container.delete(container_id)


class SelfManagedContentProvider(object):
    def __init__(self):
        self.session = SimpleNamespace(user=None)
        # We use the default behavior of the default provider...
        self.keys = server_cache_ofs.keys
        self.key_file = server_cache_ofs.key_file
        self.events = server_cache_ofs.events
        self.users = server_cache_ofs.users
        # ...for everything except for containers, at which point our custom behavior takes over.
        self.containers = ContainerProvider()

    def initialize(self, *args, **kwargs):
        server_cache_ofs.initialize(*args, **kwargs)

    def login(self, *args, **kwargs):
        user = server_cache_ofs.login(*args, **kwargs)
        self.session.user = user
        return user

    def logout(self, *args, **kwargs):
        server_cache_ofs.logout(*args, **kwargs)
        self.session.user = None


self_managed = SelfManagedContentProvider()

So how would you go about using this provider? Like this:

with absio.providers.provider(self_managed):
    user = absio.login(user_id, password)
    # This container is created on the server and in the OFS, but the content is not stored
    # in either location.
    container = absio.container.create(content)
    # Now you, the app developer, decide where to store the data.  Here's an example:
    open('/tmp/{}'.format(container.id), 'wb').write(container.data)

    # Now to fetch the container back and merge it with your encrypted data:
    data = open('/tmp/{}'.format(container.id), 'rb').read()
    container = absio.container.get(container.id, data=data)

Note that the storage and retrieval of your self-managed data could just as easily have been accomplished within the implementation of the get() and update_or_create() methods. This would reduce the sample usage to just the create() and get().