Jump to content

Krishna Jain

Members
  • Posts

    2
  • Joined

  • Last visited

Posts posted by Krishna Jain

  1. Hey guys,
    I'm trying out publishing with the python API to see if ftrack will work for our studio, but I'm running into an issue.

    Error uploading file: Failed to transfer component <FileComponent(7316ef1f-bc46-4845-9090-d738d4234dcd)> data to location <Location("studio.central-storage-location", e8f6d53f-f106-472e-9e8e-31cf50331150)> due to error:
    b'    'Component <dynamic ftrack FileComponent object 2665793305936> must be attached to a committed version and a committed asset with a parent context.
    b'    'Transferred component data that may require cleanup: []

    Is there something I'm missing? I've committed both after creating the asset and the version. The script creates the krj/test folder but nothing appears inside it. Here's the code:
     

    import ftrack_api
    import re
    import json
    import os
    from pha.ftrack.FtrackManager import FtrackManager as FTM
    
    def get_folder_or_rootfolder_by_name(parent, name, project=None):
        """Gets Folder (or RootFolder) by name
    
        Args: 
            parent (Project, Folder, or RootFolder): Parent folder to look within
            name (str): Name of the folder to find
            project (Project, optional): If provided and the folder isn't found, it will be created. Defaults to None.
    
        Returns:
            (Folder or RootFolder):
        """
        session = FTM().session
        query = f'RootFolder where parent_id is {parent["id"]}'
        folders = session.query(query).all()
        roots = [folder['name'] for folder in folders]
        folder = next((s for s in roots if s.lower() == name.lower()), None)
        if not folder:
            query = f'Folder where parent_id is {parent["id"]}'
            folders = session.query(query).all()
            roots = [folder['name'] for folder in folders]
            folder = next((s for s in roots if s.lower() == name.lower()), None)
        folder = [obj for obj in folders if obj['name'] == folder]
        if len(folder):
            folder = folder[0]
        elif project:
            folder = session.create('Folder', {
                'name': name,
                'project_id': project['id']
            })
            folder['parent'] = parent
        return folder
    
    def get_or_create_ftrack_folder(ftrack_path):
        if (isinstance(ftrack_path, str)):
            ftrack_path = re.split(r'[./\\]', ftrack_path)
        
        # get base folder
        project = FTM().session.query(f'Project where id is {FTM().get_project_id()}').first()
        folder = project
        while len(ftrack_path):
            folder = get_folder_or_rootfolder_by_name(folder, ftrack_path.pop(0), project)
        return folder
    
    def get_highest_version(versions):
        # Extract the version numbers from version names and find the highest one.
        version_numbers = [int(re.findall(r'\d+', version['name'])[0]) for version in versions]
        highest_version_number = max(version_numbers) if version_numbers else 0
    
        # Return the version object with the highest version number.
        return next((version for version in versions if int(re.findall(r'\d+', version['name'])[0]) == highest_version_number), None)
    
    def create_asset_version(asset, options):
        session = FTM().session
        # Get all versions of the asset.
        versions = asset['versions']
        
        if versions:
            # Get the highest version.
            highest_version = get_highest_version(versions)
    
            if highest_version:
                # Increment the highest version number and create a new version with the updated name.
                highest_version_number = int(re.findall(r'\d+', highest_version['name'])[0])
                new_version_number = highest_version_number + 1
            else:
                # If no highest version found, start with version 1.
                new_version_number = 1
    
            new_version_name = f'v{str(new_version_number).zfill(2)}'
            version = session.create('AssetVersion', {
                'name': new_version_name,
                'parent_id': asset['id']
            })
        else:
            # If no versions exist, create a new version with the provided version name.
            version = session.create('AssetVersion', {
                'name': 'v01',
                'parent_id': asset['id'],
                'task_id': options.get('task', None)
            })
        return version
    
    def save_thumbnail_image():
        '''Save thumbnail as temporary file and return file path.'''
        pass
    
    def publish(document_path, options):
        '''Publish a version based on *options*.'''
        session = FTM().session
        try:
            # Create new or get existing asset
            asset = session.ensure('Asset', {
                'context_id': options['parent'],
                'type_id': options['type'],
                'name': options['name']
            })
            session.commit()
            version = create_asset_version(asset, options)
            
            # Commit before adding components to ensure structures dependent on
            # committed ancestors work as expected.
            session.commit()
            
            component = version.create_component(
                document_path,
                data=dict(name=options['name'], filetype=os.path.splitext(document_path)[1]),
                location='auto'
            )
            session.commit()
    
        except Exception:
            # On any exception, rollback and re-raise error
            session.rollback()
            raise
        
        return version['id']
    
    def main():
        try:
            project_name = 'pha_project_test_01'
            file_path = r'C:\Users\kjain\Desktop\test.tga'
    
            session = FTM().session
            FTM().set_project_id(project_name)
            folder = get_or_create_ftrack_folder('krj.test')
            session.commit()
    
            options = {
                'parent': folder['id'],
                'type': FTM().get_type('Image Sequence')['id'],
                'name': os.path.basename(file_path),
                'task': FTM().get_task('03_COLOR')['id'],
                'description': 'test'
            }
            
            publish(file_path, options)
            session.commit()
    
        except Exception as e:
            print(f"Error uploading file: {e}")
    
    main()

    where FtrackManager is just this:
     

    import ftrack_api
    from pha.singleton import Singleton
    
    class FtrackManager(metaclass=Singleton):
        def __init__(self, api_user=None):
            server_url = 'my-server-url'
            api_key = 'my-api-key'
            api_user = 'my-user-email'
            self.session = ftrack_api.Session(server_url, api_key, api_user)
            self.project_id = None
        
        def set_project_id(self, project_name):
            # Query the project by name
            projects = self.session.query('Project where name is "{}"'.format(project_name))
    
            # If the project is found, set its ID
            if projects:
                self.project_id = projects[0]['id']
    
        def get_project_id(self):
            return self.project_id
        
        def get_type(self, type_name):
            query = self.session.query('AssetType where name is "{}"'.format(type_name)).one()
            return query if query else None
    
        def get_task(self, task_name):
            query = 'Task where project_id is "{}" and name is "{}"'.format(self.get_project_id(), task_name)
            task = self.session.query(query).first()
            return task if task else None

     

×
×
  • Create New...