{"kind":"AgentDefinition","metadata":{"namespace":"community","name":"dataverse-python-file-operations","version":"0.1.0"},"spec":{"agents_md":"# Dataverse SDK for Python - File Operations \u0026 Practical Examples\n\n## Overview\nComplete guide to file upload operations, chunking strategies, and practical real-world examples using the PowerPlatform-DataverseClient-Python SDK.\n\n---\n\n## 1. File Upload Fundamentals\n\n### Small File Upload (\u003c 128 MB)\n```python\nfrom pathlib import Path\nfrom PowerPlatform.Dataverse.client import DataverseClient\n\nfile_path = Path(\"document.pdf\")\nrecord_id = \"account-guid\"\n\n# Single PATCH upload for small files\nresponse = client.upload_file(\n    table_name=\"account\",\n    record_id=record_id,\n    file_column_name=\"new_documentfile\",\n    file_path=file_path\n)\n\nprint(f\"Upload successful: {response}\")\n```\n\n**When to use:** Documents, images, PDFs under 128 MB\n\n### Large File Upload with Chunking\n```python\nfrom pathlib import Path\n\nfile_path = Path(\"large_video.mp4\")\nrecord_id = \"account-guid\"\n\n# SDK automatically handles chunking for large files\nresponse = client.upload_file(\n    table_name=\"account\",\n    record_id=record_id,\n    file_column_name=\"new_videofile\",\n    file_path=file_path,\n    chunk_size=4 * 1024 * 1024  # 4 MB chunks\n)\n\nprint(\"Chunked upload complete\")\n```\n\n**When to use:** Large videos, databases, archives \u003e 128 MB\n\n### Upload with Progress Tracking\n```python\nimport hashlib\nfrom pathlib import Path\n\ndef calculate_file_hash(file_path):\n    \"\"\"Calculate SHA-256 hash of file.\"\"\"\n    hash_obj = hashlib.sha256()\n    with open(file_path, 'rb') as f:\n        for chunk in iter(lambda: f.read(1024*1024), b''):\n            hash_obj.update(chunk)\n    return hash_obj.hexdigest()\n\ndef upload_with_tracking(client, table_name, record_id, column_name, file_path):\n    \"\"\"Upload file with validation tracking.\"\"\"\n    file_path = Path(file_path)\n    file_size = file_path.stat().st_size\n    \n    print(f\"Starting upload: {file_path.name} ({file_size / 1024 / 1024:.2f} MB)\")\n    \n    # Calculate hash before upload\n    original_hash = calculate_file_hash(file_path)\n    print(f\"File hash: {original_hash}\")\n    \n    # Perform upload\n    response = client.upload_file(\n        table_name=table_name,\n        record_id=record_id,\n        file_column_name=column_name,\n        file_path=file_path\n    )\n    \n    print(f\"✓ Upload complete\")\n    return response\n\n# Usage\nupload_with_tracking(client, \"account\", account_id, \"new_documentfile\", \"report.pdf\")\n```\n\n---\n\n## 2. Upload Strategies \u0026 Configuration\n\n### Automatic Chunking Decision\n```python\ndef upload_file_smart(client, table_name, record_id, column_name, file_path):\n    \"\"\"Upload with automatic strategy selection.\"\"\"\n    file_path = Path(file_path)\n    file_size = file_path.stat().st_size\n    max_single_patch = 128 * 1024 * 1024  # 128 MB\n    \n    if file_size \u003c= max_single_patch:\n        print(f\"Using single PATCH (file \u003c 128 MB)\")\n        chunk_size = None  # SDK will use single request\n    else:\n        print(f\"Using chunked upload (file \u003e 128 MB)\")\n        chunk_size = 4 * 1024 * 1024  # 4 MB chunks\n    \n    response = client.upload_file(\n        table_name=table_name,\n        record_id=record_id,\n        file_column_name=column_name,\n        file_path=file_path,\n        chunk_size=chunk_size\n    )\n    \n    return response\n\n# Usage\nupload_file_smart(client, \"account\", account_id, \"new_largemedifile\", \"video.mp4\")\n```\n\n### Batch File Uploads\n```python\nfrom pathlib import Path\nfrom PowerPlatform.Dataverse.core.errors import HttpError\n\ndef batch_upload_files(client, table_name, record_id, files_dict):\n    \"\"\"\n    Upload multiple files to different columns of same record.\n    \n    Args:\n        table_name: Table name\n        record_id: Record ID\n        files_dict: {\"column_name\": \"file_path\", ...}\n    \n    Returns:\n        {\"success\": [...], \"failed\": [...]}\n    \"\"\"\n    results = {\"success\": [], \"failed\": []}\n    \n    for column_name, file_path in files_dict.items():\n        try:\n            print(f\"Uploading {Path(file_path).name} to {column_name}...\")\n            response = client.upload_file(\n                table_name=table_name,\n                record_id=record_id,\n                file_column_name=column_name,\n                file_path=file_path\n            )\n            results[\"success\"].append({\n                \"column\": column_name,\n                \"file\": Path(file_path).name,\n                \"response\": response\n            })\n            print(f\"  ✓ Uploaded successfully\")\n        except HttpError as e:\n            results[\"failed\"].append({\n                \"column\": column_name,\n                \"file\": Path(file_path).name,\n                \"error\": str(e)\n            })\n            print(f\"  ❌ Upload failed: {e}\")\n    \n    return results\n\n# Usage\nfiles = {\n    \"new_contractfile\": \"contract.pdf\",\n    \"new_specfile\": \"specification.docx\",\n    \"new_designfile\": \"design.png\"\n}\nresults = batch_upload_files(client, \"account\", account_id, files)\nprint(f\"Success: {len(results['success'])}, Failed: {len(results['failed'])}\")\n```\n\n### Resume Failed Uploads\n```python\nfrom pathlib import Path\nimport time\nfrom PowerPlatform.Dataverse.core.errors import HttpError\n\ndef upload_with_retry(client, table_name, record_id, column_name, file_path, max_retries=3):\n    \"\"\"Upload with exponential backoff retry logic.\"\"\"\n    file_path = Path(file_path)\n    \n    for attempt in range(max_retries):\n        try:\n            print(f\"Upload attempt {attempt + 1}/{max_retries}: {file_path.name}\")\n            response = client.upload_file(\n                table_name=table_name,\n                record_id=record_id,\n                file_column_name=column_name,\n                file_path=file_path,\n                chunk_size=4 * 1024 * 1024\n            )\n            print(f\"✓ Upload successful\")\n            return response\n        except HttpError as e:\n            if attempt == max_retries - 1:\n                print(f\"❌ Upload failed after {max_retries} attempts\")\n                raise\n            \n            # Exponential backoff: 1s, 2s, 4s\n            backoff_seconds = 2 ** attempt\n            print(f\"⚠ Upload failed. Retrying in {backoff_seconds}s...\")\n            time.sleep(backoff_seconds)\n\n# Usage\nupload_with_retry(client, \"account\", account_id, \"new_documentfile\", \"contract.pdf\")\n```\n\n---\n\n## 3. Real-World Examples\n\n### Example 1: Customer Document Management System\n\n```python\nfrom pathlib import Path\nfrom datetime import datetime\nfrom enum import IntEnum\nfrom PowerPlatform.Dataverse.client import DataverseClient\nfrom azure.identity import ClientSecretCredential\n\nclass DocumentType(IntEnum):\n    CONTRACT = 1\n    INVOICE = 2\n    SPECIFICATION = 3\n    OTHER = 4\n\n# Setup\ncredential = ClientSecretCredential(\n    tenant_id=\"tenant-id\",\n    client_id=\"client-id\",\n    client_secret=\"client-secret\"\n)\nclient = DataverseClient(\"https://yourorg.crm.dynamics.com\", credential)\n\ndef upload_customer_document(customer_id, doc_path, doc_type):\n    \"\"\"Upload document for customer.\"\"\"\n    doc_path = Path(doc_path)\n    \n    # Create document record\n    doc_record = {\n        \"new_documentname\": doc_path.stem,\n        \"new_documenttype\": doc_type,\n        \"new_customerid\": customer_id,\n        \"new_uploadeddate\": datetime.now().isoformat(),\n        \"new_filesize\": doc_path.stat().st_size\n    }\n    \n    doc_ids = client.create(\"new_customerdocument\", doc_record)\n    doc_id = doc_ids[0]\n    \n    # Upload file\n    print(f\"Uploading {doc_path.name}...\")\n    client.upload_file(\n        table_name=\"new_customerdocument\",\n        record_id=doc_id,\n        file_column_name=\"new_documentfile\",\n        file_path=doc_path\n    )\n    \n    print(f\"✓ Document uploaded and linked to customer\")\n    return doc_id\n\n# Usage\ncustomer_id = \"customer-guid-here\"\ndoc_id = upload_customer_document(\n    customer_id,\n    \"contract.pdf\",\n    DocumentType.CONTRACT\n)\n\n# Query uploaded documents\ndocs = client.get(\n    \"new_customerdocument\",\n    filter=f\"new_customerid eq '{customer_id}'\",\n    select=[\"new_documentname\", \"new_documenttype\", \"new_uploadeddate\"]\n)\n\nfor page in docs:\n    for doc in page:\n        print(f\"- {doc['new_documentname']} ({doc['new_uploadeddate']})\")\n```\n\n### Example 2: Media Gallery with Thumbnails\n\n```python\nfrom pathlib import Path\nfrom enum import IntEnum\nfrom PowerPlatform.Dataverse.client import DataverseClient\n\nclass MediaType(IntEnum):\n    PHOTO = 1\n    VIDEO = 2\n    DOCUMENT = 3\n\ndef create_media_gallery(client, gallery_name, media_files):\n    \"\"\"\n    Create media gallery with multiple files.\n    \n    Args:\n        gallery_name: Gallery name\n        media_files: [{\"file\": path, \"type\": MediaType, \"description\": text}, ...]\n    \"\"\"\n    # Create gallery record\n    gallery_ids = client.create(\"new_mediagallery\", {\n        \"new_galleryname\": gallery_name,\n        \"new_createddate\": datetime.now().isoformat()\n    })\n    gallery_id = gallery_ids[0]\n    \n    # Create and upload media items\n    for media_info in media_files:\n        file_path = Path(media_info[\"file\"])\n        \n        # Create media item record\n        item_ids = client.create(\"new_mediaitem\", {\n            \"new_itemname\": file_path.stem,\n            \"new_mediatype\": media_info[\"type\"],\n            \"new_description\": media_info.get(\"description\", \"\"),\n            \"new_galleryid\": gallery_id,\n            \"new_filesize\": file_path.stat().st_size\n        })\n        item_id = item_ids[0]\n        \n        # Upload media file\n        print(f\"Uploading {file_path.name}...\")\n        client.upload_file(\n            table_name=\"new_mediaitem\",\n            record_id=item_id,\n            file_column_name=\"new_mediafile\",\n            file_path=file_path\n        )\n        print(f\"  ✓ {file_path.name}\")\n    \n    return gallery_id\n\n# Usage\nmedia_files = [\n    {\"file\": \"photo1.jpg\", \"type\": MediaType.PHOTO, \"description\": \"Product shot 1\"},\n    {\"file\": \"photo2.jpg\", \"type\": MediaType.PHOTO, \"description\": \"Product shot 2\"},\n    {\"file\": \"demo.mp4\", \"type\": MediaType.VIDEO, \"description\": \"Product demo video\"},\n    {\"file\": \"manual.pdf\", \"type\": MediaType.DOCUMENT, \"description\": \"User manual\"}\n]\n\ngallery_id = create_media_gallery(client, \"Q4 Product Launch\", media_files)\nprint(f\"Created gallery: {gallery_id}\")\n```\n\n### Example 3: Backup \u0026 Archival System\n\n```python\nfrom pathlib import Path\nfrom datetime import datetime, timedelta\nfrom PowerPlatform.Dataverse.client import DataverseClient\nfrom PowerPlatform.Dataverse.core.errors import DataverseError\nimport json\n\ndef backup_table_data(client, table_name, output_dir):\n    \"\"\"\n    Backup table data to JSON files and create archive record.\n    \"\"\"\n    output_dir = Path(output_dir)\n    output_dir.mkdir(exist_ok=True)\n    \n    backup_time = datetime.now()\n    backup_file = output_dir / f\"{table_name}_{backup_time.strftime('%Y%m%d_%H%M%S')}.json\"\n    \n    print(f\"Backing up {table_name}...\")\n    \n    # Retrieve all records\n    all_records = []\n    for page in client.get(table_name, top=5000):\n        all_records.extend(page)\n    \n    # Write to JSON\n    with open(backup_file, 'w') as f:\n        json.dump(all_records, f, indent=2, default=str)\n    \n    print(f\"  ✓ Exported {len(all_records)} records\")\n    \n    # Create backup record in Dataverse\n    backup_ids = client.create(\"new_backuprecord\", {\n        \"new_tablename\": table_name,\n        \"new_recordcount\": len(all_records),\n        \"new_backupdate\": backup_time.isoformat(),\n        \"new_status\": 1  # Completed\n    })\n    backup_id = backup_ids[0]\n    \n    # Upload backup file\n    print(f\"Uploading backup file...\")\n    client.upload_file(\n        table_name=\"new_backuprecord\",\n        record_id=backup_id,\n        file_column_name=\"new_backupfile\",\n        file_path=backup_file\n    )\n    \n    return backup_id\n\n# Usage\nbackup_id = backup_table_data(client, \"account\", \"backups\")\nprint(f\"Backup created: {backup_id}\")\n```\n\n### Example 4: Automated Report Generation \u0026 Storage\n\n```python\nfrom pathlib import Path\nfrom datetime import datetime\nfrom enum import IntEnum\nfrom PowerPlatform.Dataverse.client import DataverseClient\nimport json\n\nclass ReportStatus(IntEnum):\n    PENDING = 1\n    PROCESSING = 2\n    COMPLETED = 3\n    FAILED = 4\n\ndef generate_and_store_report(client, report_type, data):\n    \"\"\"\n    Generate report from data and store in Dataverse.\n    \"\"\"\n    report_time = datetime.now()\n    \n    # Generate report file (simulated)\n    report_file = Path(f\"report_{report_type}_{report_time.strftime('%Y%m%d_%H%M%S')}.json\")\n    with open(report_file, 'w') as f:\n        json.dump(data, f, indent=2)\n    \n    # Create report record\n    report_ids = client.create(\"new_report\", {\n        \"new_reportname\": f\"{report_type} Report\",\n        \"new_reporttype\": report_type,\n        \"new_generateddate\": report_time.isoformat(),\n        \"new_status\": ReportStatus.PROCESSING,\n        \"new_recordcount\": len(data.get(\"records\", []))\n    })\n    report_id = report_ids[0]\n    \n    try:\n        # Upload report file\n        print(f\"Uploading report: {report_file.name}\")\n        client.upload_file(\n            table_name=\"new_report\",\n            record_id=report_id,\n            file_column_name=\"new_reportfile\",\n            file_path=report_file\n        )\n        \n        # Update status to completed\n        client.update(\"new_report\", report_id, {\n            \"new_status\": ReportStatus.COMPLETED\n        })\n        \n        print(f\"✓ Report stored successfully\")\n        return report_id\n        \n    except Exception as e:\n        print(f\"❌ Report generation failed: {e}\")\n        client.update(\"new_report\", report_id, {\n            \"new_status\": ReportStatus.FAILED,\n            \"new_errormessage\": str(e)\n        })\n        raise\n    finally:\n        # Clean up temp file\n        report_file.unlink(missing_ok=True)\n\n# Usage\nsales_data = {\n    \"month\": \"January\",\n    \"records\": [\n        {\"product\": \"A\", \"sales\": 10000},\n        {\"product\": \"B\", \"sales\": 15000},\n        {\"product\": \"C\", \"sales\": 8000}\n    ]\n}\n\nreport_id = generate_and_store_report(client, \"SALES_SUMMARY\", sales_data)\n```\n\n---\n\n## 4. File Management Best Practices\n\n### File Size Validation\n```python\nfrom pathlib import Path\n\ndef validate_file_for_upload(file_path, max_size_mb=500):\n    \"\"\"Validate file before upload.\"\"\"\n    file_path = Path(file_path)\n    \n    if not file_path.exists():\n        raise FileNotFoundError(f\"File not found: {file_path}\")\n    \n    file_size = file_path.stat().st_size\n    max_size_bytes = max_size_mb * 1024 * 1024\n    \n    if file_size \u003e max_size_bytes:\n        raise ValueError(f\"File too large: {file_size / 1024 / 1024:.2f} MB \u003e {max_size_mb} MB\")\n    \n    return file_size\n\n# Usage\ntry:\n    size = validate_file_for_upload(\"document.pdf\", max_size_mb=128)\n    print(f\"File valid: {size / 1024 / 1024:.2f} MB\")\nexcept (FileNotFoundError, ValueError) as e:\n    print(f\"Validation failed: {e}\")\n```\n\n### Supported File Types Validation\n```python\nfrom pathlib import Path\n\nALLOWED_EXTENSIONS = {'.pdf', '.docx', '.xlsx', '.jpg', '.png', '.mp4', '.zip'}\n\ndef validate_file_type(file_path):\n    \"\"\"Validate file extension.\"\"\"\n    file_path = Path(file_path)\n    \n    if file_path.suffix.lower() not in ALLOWED_EXTENSIONS:\n        raise ValueError(f\"Unsupported file type: {file_path.suffix}\")\n    \n    return True\n\n# Usage\ntry:\n    validate_file_type(\"document.pdf\")\n    print(\"File type valid\")\nexcept ValueError as e:\n    print(f\"Invalid: {e}\")\n```\n\n### Upload Logging \u0026 Audit Trail\n```python\nfrom pathlib import Path\nfrom datetime import datetime\nimport json\n\ndef log_file_upload(table_name, record_id, file_path, status, error=None):\n    \"\"\"Log file upload for audit trail.\"\"\"\n    file_path = Path(file_path)\n    \n    log_entry = {\n        \"timestamp\": datetime.now().isoformat(),\n        \"table\": table_name,\n        \"record_id\": record_id,\n        \"file_name\": file_path.name,\n        \"file_size\": file_path.stat().st_size if file_path.exists() else 0,\n        \"status\": status,\n        \"error\": error\n    }\n    \n    # Append to log file\n    log_file = Path(\"upload_audit.log\")\n    with open(log_file, 'a') as f:\n        f.write(json.dumps(log_entry) + \"\\n\")\n    \n    return log_entry\n\n# Usage in upload wrapper\ndef upload_with_logging(client, table_name, record_id, column_name, file_path):\n    \"\"\"Upload with audit logging.\"\"\"\n    try:\n        client.upload_file(\n            table_name=table_name,\n            record_id=record_id,\n            file_column_name=column_name,\n            file_path=file_path\n        )\n        log_file_upload(table_name, record_id, file_path, \"SUCCESS\")\n    except Exception as e:\n        log_file_upload(table_name, record_id, file_path, \"FAILED\", str(e))\n        raise\n```\n\n---\n\n## 5. Troubleshooting File Operations\n\n### Common Issues \u0026 Solutions\n\n#### Issue: File Upload Timeout\n```python\n# For very large files, increase chunk size strategically\nresponse = client.upload_file(\n    table_name=\"account\",\n    record_id=record_id,\n    file_column_name=\"new_file\",\n    file_path=\"large_file.zip\",\n    chunk_size=8 * 1024 * 1024  # 8 MB chunks\n)\n```\n\n#### Issue: Insufficient Disk Space\n```python\nimport shutil\nfrom pathlib import Path\n\ndef check_upload_space(file_path):\n    \"\"\"Check if system has space for file + temp buffer.\"\"\"\n    file_path = Path(file_path)\n    file_size = file_path.stat().st_size\n    \n    # Get disk space\n    total, used, free = shutil.disk_usage(file_path.parent)\n    \n    # Need file_size + 10% buffer\n    required_space = file_size * 1.1\n    \n    if free \u003c required_space:\n        raise OSError(f\"Insufficient disk space: {free / 1024 / 1024:.0f} MB free, {required_space / 1024 / 1024:.0f} MB needed\")\n    \n    return True\n```\n\n#### Issue: File Corruption During Upload\n```python\nimport hashlib\n\ndef verify_uploaded_file(local_path, remote_data):\n    \"\"\"Verify uploaded file integrity.\"\"\"\n    # Calculate local hash\n    with open(local_path, 'rb') as f:\n        local_hash = hashlib.sha256(f.read()).hexdigest()\n    \n    # Compare with metadata\n    remote_hash = remote_data.get(\"new_filehash\")\n    \n    if local_hash != remote_hash:\n        raise ValueError(\"File corruption detected: hash mismatch\")\n    \n    return True\n```\n\n---\n\n## Reference\n- [Official File Upload Example](https://github.com/microsoft/PowerPlatform-DataverseClient-Python/blob/main/examples/advanced/file_upload.py)\n- [File Upload Best Practices](https://learn.microsoft.com/en-us/power-apps/developer/data-platform/file-column-data)\n","description":"Complete guide to file upload operations, chunking strategies, and practical real-world examples using the PowerPlatform-DataverseClient-Python SDK.","import":{"commit_sha":"541b7819d8c3545c6df122491af4fa1eae415779","imported_at":"2026-05-18T20:05:35Z","license_text":"MIT License\n\nCopyright GitHub, Inc.\n\nPermission is hereby granted, free of charge, to any person obtaining a copy\nof this software and associated documentation files (the \"Software\"), to deal\nin the Software without restriction, including without limitation the rights\nto use, copy, modify, merge, publish, distribute, sublicense, and/or sell\ncopies of the Software, and to permit persons to whom the Software is\nfurnished to do so, subject to the following conditions:\n\nThe above copyright notice and this permission notice shall be included in all\ncopies or substantial portions of the Software.\n\nTHE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\nIMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,\nFITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE\nAUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER\nLIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,\nOUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE\nSOFTWARE.","owner":"github","repo":"github/awesome-copilot","source_url":"https://github.com/github/awesome-copilot/blob/541b7819d8c3545c6df122491af4fa1eae415779/instructions/dataverse-python-file-operations.instructions.md"},"manifest":{}},"content_hash":[239,60,67,215,237,93,41,35,87,154,167,221,58,121,152,101,216,115,86,108,116,179,170,61,18,224,165,180,135,194,221,158],"trust_level":"unsigned","yanked":false}
