Alican Posted February 9, 2018 Report Share Posted February 9, 2018 Hello everybody! This action populates some custom attributes within a Task that is nested under selected item. These attributes are paths to working directories and render outputs.. I have other actions that rely on these custom paths to do other things such as opening a shot with quicktime / launching the shot folder in explorer /etc.. Everything works as it should.... but I am concerned about the efficiency and speed of my code as populating each task takes about a second... and of course, the more tasks nested under my selection, the longer this process takes. Ideally... I would like to run this action at Project or at least Sequence level and let it update the custom attributes for all tasks nested under it. EpisodicS2└-- Episode1 └-- Seq001 └-- Shot1 └-- Task1 └-- Task2 └-- Task..10 Am I doing everything right? Are there major deficiencies in my code? Thank you for all the help! Ali import logging import ftrack_api import os import re import ftrack import getpass import time os.environ['FTRACK_SERVER'] = ### os.environ['FTRACK_API_KEY'] = ### #this script will populate parameters of each task and shot nested under selected item with direct paths to file locations. ### prerequisites: #Task should have the following attributes : path, out_path, base_path #Shot should have the following attributes : plate_path #Project should have the following attributes : Project_Path #<----naming will need to be conformed ### class UpdatePaths(object): '''Custom action.''' label = 'Update Paths' identifier = 'update.task.paths' description = 'This updates the path parameter of all tasks nested under selected' icon = 'https://d30y9cdsu7xlg0.cloudfront.net/png/1406969-200.png' def __init__(self, session): '''Initialise action.''' super(UpdatePaths, self).__init__() self.session = session self.logger = logging.getLogger( __name__ + '.' + self.__class__.__name__ ) def register(self): '''Register action.''' try: ftrack.EVENT_HUB.subscribe( 'topic=ftrack.action.discover and source.user.username={0}'.format( getpass.getuser() ), self.discover ) ftrack.EVENT_HUB.subscribe( 'topic=ftrack.action.launch and source.user.username={0} ' 'and data.actionIdentifier={1}'.format( getpass.getuser(), self.identifier ), self.launch ) except: self.session.event_hub.subscribe( 'topic=ftrack.action.discover', self.discover ) self.session.event_hub.subscribe( 'topic=ftrack.action.launch and data.actionIdentifier={0}'.format( self.identifier), self.launch ) def discover(self, event): '''Return action config if triggered on a single asset version.''' data = event['data'] # If selection contains more than one item return early since # this action can only handle a single version. selection = data.get('selection', []) self.logger.info('Got selection: {0}'.format(selection)) #if len(selection) != 1 or selection[0]['entityType'] != 'assetversion': # return return { 'items': [{ 'label': self.label, 'description': self.description, 'actionIdentifier': self.identifier, 'icon': self.icon }] } def launch(self, event): start_time = time.time() data = event['data'] selection = data.get('selection', []) entityTypes = ['Project', 'Episode', 'Sequence', 'Shot', 'Task'] session = ftrack_api.Session() for entity in selection: num = 0 et = entityTypes[num] item = session.query('select name from {0} where id is {1}'.format(et, entity['entityId'])).first() while item == None: num += 1 et = entityTypes[num] item = session.query('select name from {0} where id is {1}'.format(et, entity['entityId'])).first() prj_item_selected = False #If the current selected item is a project, first process it byitself. if item['parent'] == None: project = item prj_item_selected = True project_base = item['custom_attributes']['Project_Path']+item['full_name']+'/' if item['custom_attributes']['base_path']!= project_base: item['custom_attributes']['base_path'] = project_base #now look for the selected items descendants items = item['descendants'][:] #if the selected item is not a project, then we will also need to add taht to the list we will iterate through. if not prj_item_selected: items += [item] project = session.get('Project', item['project_id']) print 'Total nested items:',len(items) for i in items: link_list = i['link'][1:] episode = '' #search for an episode item in the links for l in link_list: link_item = session.get(l['type'], l['id']) if link_item['object_type']['name'] == 'Episode': episode = link_item['name'] project_path = project['custom_attributes']['Project_Path'] project_name = project['full_name'] base_path = project_path+project_name+'/'+episode+'/' base_path = base_path.replace('//','/') if i['custom_attributes']['base_path'] != base_path: i['custom_attributes']['base_path'] = base_path if i['object_type']['name'] == 'Task': shot_name = i['parent']['name'] #this may break when it comes to asset builds since they are not nested under a shot.. task_type = i['type']['name'] task_name = i['name'] path = base_path out_path = base_path if task_type in ['Compositing', 'Precomp', 'Cleanplate', 'Retime', 'Rotoscoping', 'Paintout']: comp_out_dir = '02_OUTPUT/03_comp' if task_type.lower() != task_name.lower(): comp_out_dir = '02_OUTPUT/01_precomp/{task_name}'.format(task_name=task_name) path = '{base_path}{dept_name}/{shot_name}/'.format(base_path=base_path,shot_name=shot_name,dept_name='05_COMP') out_path = '{base_path}{dept_name}/{shot_name}/{comp_out_dir}/'.format(base_path=base_path,shot_name=shot_name,dept_name='05_COMP',comp_out_dir=comp_out_dir) if task_type in ['Matchmove', 'Tracking']: path = '{base_path}{dept_name}/scenes/{shot_name}/tracking/'.format(base_path=base_path,shot_name=shot_name,dept_name='04_3D') out_path = '{base_path}{dept_name}/{shot_name}/TRAC/'.format(base_path=base_path,shot_name=shot_name,dept_name='06_RENDERS') if task_type in ['Animation']: path = '{base_path}{dept_name}/scenes/{shot_name}/anim/'.format(base_path=base_path,shot_name=shot_name,dept_name='04_3D') out_path = '{base_path}{dept_name}/{shot_name}/ANIM/'.format(base_path=base_path,shot_name=shot_name,dept_name='06_RENDERS') if task_type in ['Layout']: path = '{base_path}{dept_name}/scenes/{shot_name}/layout/'.format(base_path=base_path,shot_name=shot_name,dept_name='04_3D') out_path = '{base_path}{dept_name}/{shot_name}/LYT/'.format(base_path=base_path,shot_name=shot_name,dept_name='06_RENDERS') if task_type in ['Lighting']: path = '{base_path}{dept_name}/scenes/{shot_name}/lighting/'.format(base_path=base_path,shot_name=shot_name,dept_name='04_3D') out_path = '{base_path}{dept_name}/{shot_name}/FINL/'.format(base_path=base_path,shot_name=shot_name,dept_name='06_RENDERS') if task_type in ['FX']: path = '{base_path}{dept_name}/scenes/{shot_name}/fx/'.format(base_path=base_path,shot_name=shot_name,dept_name='04_3D') out_path = '{base_path}{dept_name}/{shot_name}/FX/'.format(base_path=base_path,shot_name=shot_name,dept_name='06_RENDERS') path = path.replace('//', '/') out_path = out_path.replace('//', '/') if i['custom_attributes']['path']!= path: #only make changes if they dont already exist i['custom_attributes']['path'] = path if i['custom_attributes']['out_path'] != out_path: #only make changes if they dont already exist i['custom_attributes']['out_path'] = out_path session.commit() print("--- %s seconds ---" % (time.time() - start_time)) return { 'success': True, 'message': 'updated task paths!' } def register(session, **kw): '''Register plugin.''' # Validate that session is an instance of ftrack_api.Session. If not, # assume that register is being called from an incompatible API # and return without doing anything. if not isinstance(session, ftrack_api.Session): # Exit to avoid registering this plugin again. return action = UpdatePaths(session) action.register() if __name__ == '__main__': logging.basicConfig(level=logging.INFO) session = ftrack_api.Session() register(session) # Wait for events. session.event_hub.wait() Link to comment Share on other sites More sharing options...
Erik Posted February 12, 2018 Report Share Posted February 12, 2018 I think you can speed this up greatly by collecting all the selected id's first and then querying for "id in ()" to get a collection instead of "id is " -- Erik Link to comment Share on other sites More sharing options...
Mattias Lagergren Posted February 12, 2018 Report Share Posted February 12, 2018 There are absolutely some speed improvements that can be done here. Mostly about reducing the number of queries to the server by using projections and some attributes that can make it more efficient to get the data that you're looking for. On 10/02/2018 at 12:41 AM, Alican said: session = ftrack_api.Session() It is strongly recommended that you do not connect to the event hub in the action launch callback Session(auto_connect_event_hub=False) as this will cause unnecessary stress on your server. Instead of looping the different entity types you can do the following: 'TypedContext where id is "{entityId}"'.format(...) Or if it is a project (you can find this in the selection data): 'Project where id is "{entityId}"'.format(...) If you want to quickly fetch all custom attributes (and other data) for the descendant tasks of an item it is faster to use projections: # Fetch all tasks on Project or another TypedContext parent. session.query('select custom_attributes, type.name, parent.name, name from Task where ancestors.id is "{0}" or project_id is "{1}"'.format(...)). Link to comment Share on other sites More sharing options...
Recommended Posts
Archived
This topic is now archived and is closed to further replies.