import os
import glob
from irods.column import Criterion
from irods.exception import (CollectionDoesNotExist, OperationNotSupported,
MultipleResultsFound)
from irods.keywords import FORCE_FLAG_KW
from irods.meta import iRODSMeta
from irods.models import Collection, DataObject
from vsc_irods.manager import Manager
# Job-related environment variables used by add_job_metadata
job_env_var = ['PBS_O_HOST', 'PBS_JOBID', 'PBS_JOBNAME', 'PBS_NODEFILE',
'SLURM_SUBMIT_HOST', 'SLURM_JOB_ID', 'SLURM_JOB_NAME',
'SLURM_JOB_NODELIST']
def confirm(operation, kind, item):
""" Prompts the users to confirm the given operation """
answer = None
while answer not in ['y', 'n']:
answer = input('OK to %s %s %s? [y/n] ' % (operation, kind, item))
return answer == 'y'
[docs]class BulkManager(Manager):
""" A class for easier 'bulk' operations with the iRODS file system """
[docs] def remove(self, iterator, recurse=False, force=False, interactive=False,
verbose=False, **options):
""" Remove iRODS data objects and/or collections,
in a manner that resembles the UNIX 'rm' command.
Examples:
>>> session.bulk.remove('tmpdir*', recurse=True)
>>> session.bulk.remove('~/molecule_database/*.xyz')
Arguments:
iterator: iterator or str
Defines which items are subject to the bulk operation.
Can be an iterator (e.g. using search_manager.find())
or a string (which will be used to construct a
search_manager.iglob() iterator). Matching data objects
(and, if used recursively, collections) will be removed.
recurse: bool (default: False)
Whether to use recursion, meaning that also matching collections
and their data objects and subcollections will be removed.
force: bool (default: False)
Whether to imediately remove the collections or data objects,
without putting them in the trash.
interactive: bool (default: False)
Whether to prompt for permission before every removal.
verbose: bool (default: False)
Whether to print more output.
options: (any remaining keywords arguments)
Additional options to be passed on to PRC's
collections.remove() and data_objects.unlink() methods.
"""
if isinstance(iterator, str):
iterator = self.session.search.iglob(iterator)
for item in iterator:
path = self.session.path.get_absolute_irods_path(item)
if self.session.collections.exists(path):
# Item is a collection, not an object
if recurse:
ok = confirm('remove', 'collection', path) if interactive \
else True
if ok:
self.log('Removing collection %s' % path, verbose)
self.session.collections.remove(path, recurse=True,
force=force, **options)
else:
self.log('Skipping collection %s (no recursion)' % item,
verbose)
else:
ok = confirm('remove', 'object', path) if interactive else True
if ok:
self.log('Removing object %s' % path, verbose)
self.session.data_objects.unlink(path, force=force,
**options)
[docs] def move(self, iterator, irods_path, clobber=True, interactive=False,
verbose=False):
""" Moving or renaming iRODS data objects and/or collections,
similar to the UNIX `mv` command.
Raises an CollectionDoesNotExist if the iterator corresponds to more
than one item and the irods_path destination does not correspond to an
existing collection.
Examples:
>>> session.bulk.move('tmpfiles*', '~/tmpdir/', verbose=True)
>>> session.bulk.move('./parent/dirname', './parent/dirname_new')
Arguments:
iterator: iterator or str
Defines which items are subject to the bulk operation.
Can be an iterator (e.g. using search_manager.find())
or a string (which will be used to construct a
search_manager.iglob() iterator). Matching data objects
and collections will be moved to the new path.
irods_path: str (default: '.')
The (absolute or relative) path on the local file system
where the data objects and collections will moved to.
clobber: bool (default: True)
Whether to overwrite existing data objects.
interactive: bool (default: False)
Whether to prompt for permission before overwriting
existing data objects. If True, the value of the 'clobber'
argument is ignored.
verbose: bool (default: False)
Whether to print more output.
"""
def move_one(src, dest):
# Move/renames a single item
src_abs = self.session.path.get_absolute_irods_path(src)
dest_abs = self.session.path.get_absolute_irods_path(dest)
src_is_collection = self.session.collections.exists(src_abs)
dest_is_object = self.session.data_objects.exists(dest_abs)
kind = 'collection' if src_is_collection else 'data object'
ok = True
if not clobber:
ok = not dest_is_object
if interactive:
ok = confirm('move', kind, src_abs + ' to ' + dest_abs)
if ok:
self.log('Moving %s %s to destination %s' % \
(kind, src_abs, dest_abs), verbose)
if src_is_collection:
if dest_is_object:
msg = 'Cannot overwrite obj %s with coll %s'
raise OperationNotSupported(msg % (dest_abs, src_abs))
else:
self.session.collections.move(src_abs, dest_abs)
else:
self.session.data_objects.move(src_abs, dest_abs)
else:
self.log('Skipped moving %s %s to destination %s' % \
(kind, src_abs, dest_abs), verbose)
if isinstance(iterator, str):
iterator = self.session.search.iglob(iterator)
try:
previous_item = next(iterator)
except StopIteration:
raise StopIteration('Iterator yields no objects or collections')
dest = self.session.path.get_absolute_irods_path(irods_path)
item = None
for item in iterator:
# There is more than one item, so irods_path needs
# to be an existing collection
if not self.session.collections.exists(dest):
raise CollectionDoesNotExist(dest)
move_one(previous_item, dest)
previous_item = item
if item is not None:
move_one(item, dest)
else:
# The iterator only held 1 item originally,
# and it hasn't been processed yet
move_one(previous_item, dest)
return
[docs] def get(self, iterator, local_path='.', recurse=False, clobber=True,
interactive=False, return_data_objects=False, verbose=False,
**options):
""" Copy iRODS data objects and/or collections to the local machine.
Examples:
>>> session.bulk.get('tmpdir*', recurse=True)
>>> session.bulk.get('~/irods_db/*.xyz', local_path='./local_db')
Arguments:
iterator: iterator or str
Defines which items are subject to the bulk operation.
Can be an iterator (e.g. using search_manager.find())
or a string (which will be used to construct a
search_manager.iglob() iterator). Matching data objects
(and, if used recursively, collections) will be copied
to the local machine.
local_path: str (default: '.')
The (absolute or relative) path on the local file system
where the data objects and collections will be copied to.
recurse: bool (default: False)
Whether to use recursion, meaning that also matching collections
and their data objects and subcollections will be copied.
clobber: bool (default: True)
Whether to overwrite existing local files.
interactive: bool (default: False)
Whether to prompt for permission before overwriting
existing local files. If True, the value of the 'clobber'
argument is ignored.
return_data_objects: (default: False)
Whether to return a list of iRODSDataObject instances
of the uploaded data objects, instead of downloading
them to the local file system. If True, the 'clobber'
and 'interactive' arguments are ignored.
verbose: bool (default: False)
Whether to print more output.
options: (any remaining keywords arguments)
Additional options to be passed on to PRC's
data_objects.get() method.
"""
if isinstance(iterator, str):
iterator = self.session.search.iglob(iterator)
if not return_data_objects and not os.path.isdir(local_path):
raise OSError('Destination %s does not exist' % local_path)
objects = []
for item in iterator:
path = self.session.path.get_absolute_irods_path(item)
if self.session.collections.exists(path):
# Item is a collection, not an object
if recurse:
d = os.path.join(local_path, os.path.basename(item))
if not os.path.exists(d) and not return_data_objects:
self.log('Creating directory: %s' % d, verbose)
os.mkdir(d)
self.get(item + '/*', local_path=d, recurse=True,
clobber=clobber, interactive=interactive,
return_data_objects=return_data_objects,
verbose=verbose, **options)
else:
self.log('Skipping collection %s (no recursion)' % item,
verbose)
else:
if return_data_objects:
self.log('Getting object %s' % item, verbose)
obj = self.session.data_objects.get(path, file=None,
**options)
objects.append(obj)
else:
name = os.path.basename(path)
file_exists = os.path.exists(os.path.join(local_path, name))
ok = True
if not clobber:
ok = not file_exists
if interactive:
ok = confirm('get', 'object',
path +' to destination ' + local_path)
if ok:
self.log('Getting object %s to destination %s' % \
(path, local_path), verbose)
extra_options = {FORCE_FLAG_KW: ''}
obj = self.session.data_objects.get(path,
file=local_path,
**extra_options,
**options)
else:
self.log('Skipped getting object %s to destination %s' \
% (path, local_path), verbose)
if return_data_objects:
return objects
[docs] def put(self, iterator, irods_path='.', recurse=False, clobber=True,
interactive=False, verbose=False, create_options={}, **options):
""" Copy local files and/or folders to the iRODS server,
in a manner that resembles the UNIX 'cp' command.
Examples:
>>> session.bulk.put('tmpdir*', recurse=True)
>>> session.bulk.put('~/local_db/*.xyz', irods_path='./irods_db/')
Arguments:
iterator: iterator or str
Defines which items are subject to the bulk operation.
Can be an iterator (e.g. using search_manager.find())
or a string (which will be used to construct a
in search_manager.iglob() iterator). Matching files on the
local machine (and, if used recursively, directories) will
be copied to the iRODS server.
irods_path: str (default: '.')
The (absolute or relative) path on the iRODS file system
where the local files and folders will be copied to.
recurse: bool (default: False)
Whether to use recursion, meaning that also matching folders
and their files and subfolders will be copied to the iRODS server.
clobber: bool (default: True)
Whether to overwrite existing data objects.
interactive: bool (default: False)
Whether to prompt for permission before overwriting
existing data objects. If True, the value of the 'clobber'
argument is ignored.
verbose: bool (default: False)
Whether to print more output.
create_options: dict (default: {})
Additional options to be passed on to PRC's
collections.create() method.
options: (any remaining keywords arguments)
Additional options to be passed on to PRC's
data_objects.put() method.
"""
if type(iterator) is str:
iterator = glob.iglob(iterator)
dest = self.session.path.get_absolute_irods_path(irods_path)
if not self.session.collections.exists(dest):
raise CollectionDoesNotExist(dest)
for item in iterator:
local_path = item.rstrip('/')
path = os.path.join(dest, os.path.basename(local_path))
if os.path.isdir(local_path):
if recurse:
if not self.session.collections.exists(path):
self.log('Creating collection: %s' % path, verbose)
self.session.collections.create(path, recurse=True,
**create_options)
self.put(local_path + '/*', irods_path=path, recurse=True,
clobber=clobber, interactive=interactive,
create_options=create_options, verbose=verbose,
**options)
else:
self.log('Skipping collection %s (no recursion)' % \
local_path, verbose)
elif os.path.isfile(local_path):
object_exists = self.session.data_objects.exists(path)
ok = True
if not clobber:
ok = not object_exists
if interactive:
ok = confirm('put', 'file',
local_path +' in collection ' + dest)
if ok:
self.log('Putting file %s in collection %s' % \
(local_path, dest), verbose)
self.session.data_objects.put(local_path, dest + '/',
**options)
else:
self.log('Skipped putting file %s in collection %s' % \
(local_path, dest), verbose)
[docs] def size(self, iterator, recurse=False, verbose=False):
""" Yields (path, size-in-bytes) tuples for the selected data
objects and collections.
Examples:
>>> session.bulk.size('~/data/out*.txt')
>>> session.bulk.size('./data', recurse=True)
Arguments:
iterator: iterator or str
Defines which items are subject to the bulk operation.
Can be an iterator (e.g. using search_manager.find())
or a string (which will be used to construct a
search_manager.iglob() iterator). Data sizes will be returned
for matching data objects and, if used recursively, collections.
recurse: bool (default: False)
Whether to use recursion, meaning that the data size of
matching collections will be calculated as the sum of
their data objects and subcollection sizes.
verbose: bool (default: False)
Whether to print more output.
"""
if isinstance(iterator, str):
iterator = self.session.search.iglob(iterator)
for item in iterator:
path = self.session.path.get_absolute_irods_path(item)
if self.session.collections.exists(path):
if recurse:
new_iterator = self.size(item + '/*', recurse=True,
verbose=verbose)
size = sum([result[1] for result in new_iterator])
else:
self.log('Skipping collection %s (no recursion)' % item,
verbose)
continue
else:
dirname = os.path.dirname(path)
basename = os.path.basename(path)
criteria = [Criterion('=', Collection.name, dirname),
Criterion('=', DataObject.name, basename)]
fields = [DataObject.size]
q = self.session.query(*fields).filter(*criteria)
results = [result for result in q.get_results()]
if len(results) > 1:
raise MultipleResultsFound('Different replicas of data ' + \
'object %s have different sizes' % path)
size = results[0][DataObject.size]
yield (item, size)