This page describes all SDK methods for video operations.
Uploading Videos
The upload() method handles all video ingestion. The return type differs depending on the source:
- Local files & URLs return a
video_id.
- Cloud imports (
gs://, s3://) return an import_job_id — use get_import_job() and get_import_job_videos() to retrieve video IDs.
Local File & URL Uploads
Upload local files or HTTP/HTTPS URLs. Returns a video_id per video.
Local file & URL upload examples
# Single local file
result = client.upload("video.mp4")
# Single video with custom display name
result = client.upload("video.mp4", name="Morning Commute.mp4")
# Single video with metadata
result = client.upload(("video.mp4", "video.json"))
# Multiple local files
batch = client.upload(["a.mp4", "b.mp4"])
# Multiple videos with mixed metadata
batch = client.upload([
("video1.mp4", "video1.json"), # Video with metadata
"video2.mp4", # Video without metadata
("video3.mp4", "video3.json") # Another video with metadata
])
# Batch upload with per-video custom names (dict syntax)
batch = client.upload([
{"video": "dashcam_001.mp4", "name": "Trip to Downtown"},
{"video": "dashcam_002.mp4", "name": "Highway Merge"},
{"video": "dashcam_003.mp4", "name": "Parking Lot Exit", "metadata": "trip3.json"}
])
# Public GCS URL (any accessible HTTPS URL)
remote = client.upload("https://storage.googleapis.com/my-bucket/videos/demo.mp4")
# URL with custom display name
remote = client.upload(
"https://storage.googleapis.com/my-bucket/videos/abc123.mp4",
name="scene_1.mp4"
)
# With folder organization
result = client.upload("video.mp4", folder="my_folder")
# With metadata JSON file and folder
result = client.upload(("dashcam.mp4", "dashcam.json"), folder="fleet_videos")
# Organization scope
result = client.upload("launch.mp4", folder="robotics_org", scope="org")
Multi-view (local/URL)
Use a dict mapping view names to local files or URLs. front is required in every set.
# Single multi-view set
multi = client.upload(
{
"front": "https://example.com/trip-042/front.mp4",
"left": "https://example.com/trip-042/left.mp4",
"right": "https://example.com/trip-042/right.mp4",
},
folder="fleet_uploads",
scope="org",
)
# Multiple multi-view sets in one call
multi_batch = client.upload([
{
"front": "https://example.com/set1/front.mp4",
"left": "https://example.com/set1/left.mp4",
"right": "https://example.com/set1/right.mp4",
},
{
"front": "https://example.com/set2/front.mp4",
"left": "https://example.com/set2/left.mp4",
"right": "https://example.com/set2/right.mp4",
},
])
Local/HTTP multi-view uploads return the stitched front video_id.
- Metadata sidecars must share the same base filename as the video (e.g.,
launch.mp4 + launch.json). See the Metadata Ingestion Spec for the full schema.
- Custom names (
name param) are supported for single files, URLs, and batch dict syntax — not for cloud imports or multi-view.
- Folders are auto-created if they don’t exist. Defaults to personal scope; use
scope="org" for shared org folders.
Required Parameters:
| Parameter | Type | Description |
|---|
videos | str | Path | tuple | Sequence | Single video, (video, metadata) tuple, or list of mixed videos/tuples |
Optional Parameters:
| Parameter | Type | Default | Description |
|---|
name | str | None | Custom display name for the uploaded video. Overrides the original filename. Only supported for single file uploads, URLs, or per-video in batch dict syntax. Not supported for cloud imports or multi-view uploads. |
folder | str | None | Folder name for organizing uploads (unique within each scope) |
metadata_file | str | Path | None | Overlay metadata JSON file (must share the video’s base filename) per spec (ignored when using tuples) |
scope | 'user' | 'org' | 'user' | Scope hint for folder resolution. Use 'org' for shared org folders and 'user' for personal uploads. |
upload_timeout | int | 1200 | Timeout in seconds for upload completion |
wait_for_uploaded | bool | True | Wait until upload is complete |
Returns: Dict (single) or List[Dict] (multiple) with {"video_id": "...", "status": "processing" | "uploaded" | ...}
Cloud Imports (GCS / S3)
upload() for cloud URIs
Provide full gs:// or s3:// URIs to import videos from cloud storage.
# Import from GCS
batch = client.upload([
"gs://drive-monitor/uploads/trip-042/front.mp4",
"gs://drive-monitor/uploads/trip-042/rear.mp4",
])
# Returns: {"import_job_id": "ij_xxx", "status": "importing"}
# Import from S3 with a specific integration
client.upload(
"s3://drive-monitor-archive/2024-09-01/front.mp4",
integration_id="aws-prod",
)
# Import into a folder
client.upload(
["s3://my-bucket/videos/clip_001.mp4", "s3://my-bucket/videos/clip_002.mp4"],
folder="fleet_uploads",
scope="org",
)
Cloud-specific parameter:
| Parameter | Type | Default | Description |
|---|
integration_id | str | None | Saved cloud integration identifier to use for imports. When omitted, the SDK attempts to match the bucket against saved integrations. |
All other parameters (folder, scope, etc.) are shared with local uploads — see the full parameter table above.
Returns: {"import_job_id": "ij_xxx", "status": "importing"}
Cloud imports accept .mp4 objects referenced by full gs://bucket/object.mp4 or s3://bucket/object.mp4 URIs. Wildcard patterns are not supported—list each object explicitly. When no integration_id is provided, the SDK tries each saved integration whose bucket matches the URI until one succeeds; specify the id when multiple integrations share the bucket.
Cloud import requests are acknowledged asynchronously. Even when a request later fails validation or authorization in background processing, the API still returns an import_job_id; those failures surface as UPLOADING_FAILED rows in get_import_job_videos().
Multi-view (cloud)
Cloud multi-view uploads use dict mappings with gs:// or s3:// URIs. front is required in every set.
# Single cloud multi-view set
multi = client.upload(
{
"front": "s3://drive-monitor/uploads/trip-042/front.mp4",
"left": "s3://drive-monitor/uploads/trip-042/left.mp4",
"right": "s3://drive-monitor/uploads/trip-042/right.mp4",
},
folder="fleet_uploads",
scope="org",
)
# Returns: {"import_job_id": "ij_xxx", "status": "importing"}
- Multiple cloud multi-view sets submitted in one
upload([...]) call produce one import job.
wait_for_uploaded=True is ignored for cloud multi-view uploads.
- For multi-view import jobs,
get_import_job_videos() returns front rows only and total is the requested set count.
get_import_job()
Fetch metadata for a cloud import job.
upload = client.upload(
[
"s3://drive-monitor-archive/2024-09-01/front.mp4",
"s3://drive-monitor-archive/2024-09-01/rear.mp4",
],
wait_for_uploaded=False,
)
job = client.video.get_import_job(upload["import_job_id"])
print(job["job_id"], job["total"])
Required Parameters:
| Parameter | Type | Description |
|---|
import_job_id | str | Cloud import job ID returned by upload() |
Returns: Dict with job metadata fields such as:
job_id
source ("s3" or "gcs")
bucket
prefix
folder_id
folder_name
total
- timestamps (
created_at, completed_at, updated_at when available)
For high-volume imports, treat total + get_import_job_videos() as the readiness contract.
get_import_job_videos()
Fetch paginated per-video upload statuses for a cloud import job.
This endpoint is designed for large import jobs where returning all rows in one
response would be expensive (for example, thousands to hundreds of thousands of
videos). For jobs with more than ~1,000 videos, prefer cursor-based pagination
(limit + cursor) instead of requesting everything at once.
upload = client.upload(
[
"gs://drive-monitor/uploads/trip-042/front.mp4",
"gs://drive-monitor/uploads/trip-042/rear.mp4",
],
wait_for_uploaded=False,
)
cursor = None # First page: no cursor
while True:
result = client.video.get_import_job_videos(
upload["import_job_id"],
limit=500,
cursor=cursor, # Pass cursor from previous response
)
print(result["import_job_id"])
print(result["video_count"], result["has_more"], result["next_cursor"]) # page_size, more pages?, next anchor
print(result["videos"][:3]) # [{video_id, status, import_source_uri}, ...]
if not result["has_more"]:
break
cursor = result["next_cursor"] # Use this value for the next page
Required Parameters:
| Parameter | Type | Description |
|---|
import_job_id | str | Cloud import job ID returned by upload() |
Optional Parameters:
| Parameter | Type | Default | Description |
|---|
limit | int | 500 | Max videos to return in one page |
cursor | str | None | Last video_id from previous page |
offset | int | None | Offset-based fallback pagination (do not combine with cursor). Not supported for multi-view import jobs. |
Returns: Dict with:
import_job_id
total (requested count for this job; for multi-view jobs this is the requested set count)
limit
cursor
has_more
next_cursor
videos (list of {video_id, status, import_source_uri} entries for detailed polling; multi-view jobs return front rows only)
video_count
Cursor Notes:
cursor is the last video_id from the previous response’s next_cursor.
- Start with
cursor=None for the first page.
- Stop paging when
has_more is False.
- Prefer
cursor pagination for large jobs; offset is mainly a fallback/debug option.
- Do not pass both
cursor and offset in the same request.
- For multi-view cloud imports,
videos contains currently materialized IDs (best-effort); use total as the requested target count.
get_import_job_uploaded_video_ids() remains available as a deprecated alias of get_import_job_videos().
End-to-end cloud import example
import time
# 1. Start the cloud import — returns an import_job_id, NOT a video_id
result = client.upload([
"s3://my-bucket/videos/clip_001.mp4",
"s3://my-bucket/videos/clip_002.mp4",
"s3://my-bucket/videos/clip_003.mp4",
])
print(result)
# {"import_job_id": "ij_a1b2c3d4e5f6", "status": "importing"}
job_id = result["import_job_id"]
# 2. Poll import-job videos until returned IDs reach the requested total
# and each returned row is terminal.
TERMINAL = {"UPLOADED", "UPLOADING_FAILED"}
seen = {}
while True:
job = client.video.get_import_job(job_id)
target_total = int(job.get("total") or 0)
cursor = None
while True:
page = client.video.get_import_job_videos(job_id, limit=500, cursor=cursor)
for row in page["videos"]:
seen[row["video_id"]] = row["status"]
if not page["has_more"]:
break
cursor = page["next_cursor"]
if target_total > 0:
all_ids_materialized = len(seen) >= target_total
all_terminal = all(status in TERMINAL for status in seen.values())
print(f"Import progress: ids={len(seen)}/{target_total}")
if all_ids_materialized and all_terminal:
break
time.sleep(10)
# 3. Use uploaded IDs for analysis
video_ids = [vid for vid, status in seen.items() if status == "UPLOADED"]
print(f"Imported {len(video_ids)} videos: {video_ids[:5]}...")
# 4. Now use the video IDs for analysis
client.analyze(
video_ids,
analysis_type=AnalysisType.ASK,
custom_event="lane departure",
)
Managing Cloud Integrations
Use this helper to manage reusable GCS/S3 credentials for cloud imports. See Cloud Storage Uploads for instructions on creating service-account keys and web UI setup.
Cloud integrations helper
# List every integration visible to your user/org
client.cloud_integrations.list()
# Filter by provider
client.cloud_integrations.list(type="gcs")
# Add a new GCS integration using a service account JSON file
client.cloud_integrations.add(
type="gcs",
name="Fleet bucket",
bucket="drive-monitor",
prefix="uploads/",
credentials="service-account.json", # path or dict/bytes
)
# Add a new S3 integration
client.cloud_integrations.add(
type="s3",
name="AWS archive",
bucket="drive-archive",
prefix="raw/",
region="us-east-1",
credentials={
"accessKeyId": "...",
"secretAccessKey": "...",
"sessionToken": "...", # optional
},
)
Analyzing Videos
Run analysis on one or more uploaded videos. Two modes are available:
- ASK (Rapid Review): Detect custom events using natural language descriptions.
- AGENT: Specialized motion and behavior detection powered by curated pipelines.
ASK
Detect custom events in videos using natural language descriptions. Perfect for finding specific scenarios like “green crosswalk” or “yellow taxi”. Works for all video lengths, including long videos, with fast results.
# Single-video prompt
client.analyze(
"abc123",
analysis_type=AnalysisType.ASK,
custom_event="vehicles parked on sidewalk"
)
# With thumbnails (creates annotated bounding boxes)
client.analyze(
"abc123",
analysis_type=AnalysisType.ASK,
custom_event="delivery vans double parked",
is_thumbnail=True
)
# With overlay extraction for telemetry data
from nomadicml import OverlayMode
client.analyze(
"abc123",
analysis_type=AnalysisType.ASK,
custom_event="speeding events",
overlay_mode=OverlayMode.CUSTOM # Extracts custom fields from uploaded metadata
)
# Batch Ask: analyze multiple IDs at once
client.analyze(
["abc123", "def456"],
analysis_type=AnalysisType.ASK,
custom_event="jaywalking near intersections"
)
# Batch Ask: analyze every video in a folder
client.analyze(
folder="fleet_uploads",
analysis_type=AnalysisType.ASK,
custom_event="jaywalking near intersections"
)
Required Parameters:
| Parameter | Type | Description |
|---|
id(s) or folder | str | Sequence[str] | Video ID(s) or folder name (use one, not both) |
analysis_type | AnalysisType | Must be AnalysisType.ASK |
custom_event | str | Event description to detect (e.g., “green crosswalk”) |
Optional Parameters:
| Parameter | Type | Default | Description |
|---|
custom_category | CustomCategory | str | "driving" | Optional context to steer the answer |
model_id | str | "Nomadic-VL-XLarge" | AI model to use |
timeout | int | 2400 | Analysis timeout in seconds |
wait_for_completion | bool | True | Wait for analysis to complete |
is_thumbnail | bool | False | Generate annotated bounding box thumbnails |
return_subset | bool | False | Return subset of results |
use_enhanced_motion_analysis | bool | False | Generates enhanced motion caption of events |
confidence | str | low | confidence level for event prediction, either set to low or high |
overlay_mode | OverlayMode | str | None | Select overlay extraction mode: OverlayMode.TIMESTAMPS, OverlayMode.GPS, or OverlayMode.CUSTOM (one at a time) |
nomadicml.video.CustomCategory
CustomCategory.DRIVING
CustomCategory.ROBOTICS
CustomCategory.AERIAL
CustomCategory.SECURITY
CustomCategory.ENVIRONMENT
nomadicml.video.OverlayMode
OverlayMode.TIMESTAMPS
OverlayMode.GPS
OverlayMode.CUSTOM
About Overlay Modes:Overlay modes allow you to extract telemetry data from on-screen overlays in your videos. Choose the mode based on your overlay type:
-
TIMESTAMPS and GPS: Use these modes when your video has unstructured overlays visible on screen (like dashcam timestamps or GPS coordinates). Our models will automatically detect and extract these values from the visual overlay text.
-
CUSTOM: Use this mode when you’ve uploaded structured metadata JSON (per the spec) describing your custom overlay fields. This mode extracts the specific fields you’ve defined like speed, altitude, or other telemetry values.
Remember: Metadata must be provided at upload time. The overlay_mode parameter only controls which extraction method to use during analysis.
Returns: Dict with video_id, analysis_id, mode, status, summary, and events.
- If
is_thumbnail=True, each event includes an annotated_thumbnail_url.
- If
overlay_mode is specified and the video was uploaded with metadata, each event includes an overlay field with extracted telemetry data as {field_name: {"start": value, "end": value}} pairs.
- Overlay values are only surfaced through the
overlay field; the SDK no longer returns duplicate frame_*_start or frame_*_end keys at the root level.
AGENT
Specialized motion and behavior detection powered by curated agent pipelines:
# General agent (edge-case detection)
client.analyze(
"abc123",
analysis_type=AnalysisType.GENERAL_AGENT,
)
# Lane change specialist
client.analyze(
"abc123",
analysis_type=AnalysisType.LANE_CHANGE,
)
# Batch agent run
client.analyze(
["abc123", "def456"],
analysis_type=AnalysisType.LANE_CHANGE
)
# Batch agent run using a folder
client.analyze(
folder="san_francisco_midday",
analysis_type=AnalysisType.LANE_CHANGE
)
AnalysisType.GENERAL_AGENT: zero-shot edge-case hunting (General Edge Case)
AnalysisType.LANE_CHANGE: lane-change manoeuvre detection
AnalysisType.TURN: left/right turn behaviour
AnalysisType.RELATIVE_MOTION: relative motion between vehicles
AnalysisType.DRIVING_VIOLATIONS: speeding, stop, red-light, and related violations
Required Parameters:
| Parameter | Type | Description |
|---|
ids or folder | str | Sequence[str] | Video ID(s) or folder name (use one, not both) |
analysis_type | AnalysisType | One of AnalysisType.GENERAL_AGENT, AnalysisType.LANE_CHANGE, AnalysisType.TURN, AnalysisType.RELATIVE_MOTION, AnalysisType.DRIVING_VIOLATIONS |
Optional Parameters:
| Parameter | Type | Default | Description |
|---|
model_id | str | "Nomadic-VL-XLarge" | AI model to use |
timeout | int | 2400 | Analysis timeout in seconds |
wait_for_completion | bool | True | Wait for analysis to complete |
concept_ids | List[str] | None | Concept IDs for specialized detection |
return_subset | bool | False | Return subset of results |
Returns: Dict with video_id, analysis_id, mode, status, and events.
Structured Exports
generate_structured_odd()
Produce an ASAM OpenODD-compliant CSV describing the vehicle’s operating domain.
from nomadicml import NomadicML, DEFAULT_STRUCTURED_ODD_COLUMNS
client = NomadicML(api_key="your_api_key")
# Use the default schema or customise it before calling the export.
columns = [
{
"name": "timestamp",
"prompt": "Log the timestamp in ISO 8601 format (placeholder date 2024-01-01).",
"type": "YYYY-MM-DDTHH:MM:SSZ",
},
{
"name": "scenery.road.type",
"prompt": "The type of road the vehicle is on.",
"type": "categorical",
"literals": ["motorway", "rural", "urban_street", "parking_lot", "unpaved", "unknown"],
},
]
odd = client.generate_structured_odd(
video_id="VIDEO_ID",
columns=columns or DEFAULT_STRUCTURED_ODD_COLUMNS,
)
print(odd["csv"])
print(odd.get("share_url"))
Required Parameters:
| Parameter | Type | Description |
|---|
video_id | str | ID of the analysed video whose operating domain you want to export |
Optional Parameters:
| Parameter | Type | Default | Description |
|---|
columns | Sequence[StructuredOddColumn] | DEFAULT_STRUCTURED_ODD_COLUMNS | Column definitions matching the UI schema (name, prompt, type, optional literals) |
timeout | int | client default | Request timeout override in seconds |
Returns: Dict containing:
csv: The generated CSV text.
columns: The resolved column schema (after validation).
reasoning_trace_path: Final Firestore path used for reasoning logs.
share_id / share_url: Optional sharing metadata if the backend stored the export.
processing_time: Time spent generating the export.
raw: Full backend response payload for additional introspection.
Video & Folder Management
my_videos()
Retrieve your uploaded videos, optionally filtered by folder.
# Get all videos
videos = client.my_videos()
# Get videos in specific folder
videos = client.my_videos(folder="my_folder")
# Get videos from a personal folder (when org folder has same name)
videos = client.my_videos(folder="shared_folder", scope="user")
# Get videos from an organization folder
videos = client.my_videos(folder="shared_folder", scope="org")
Optional Parameters:
| Parameter | Type | Default | Description |
|---|
folder | str | None | Filter videos by folder name |
scope | 'user' | 'org' | None | Disambiguate folder lookup when personal and org folders share the same name. 'user' matches only personal folders, 'org' matches only organization folders. When None, personal folders are preferred. |
Returns: List[Dict] - Each dict contains:
| Field | Type | Description |
|---|
video_id | str | Unique video identifier |
video_name | str | Original filename |
duration_s | float | Video duration in seconds |
folder_id | str | Folder identifier |
status | str | Upload status (see below) |
folder_name | str | Folder name (when filtering by folder) |
org_id | str | Organization ID (if org-scoped) |
Upload status values:
| Status | Meaning |
|---|
processing | Upload in progress |
uploading_failed | Upload failed |
uploaded | Ready for analysis |
delete_video()
Remove a video by ID.
client.delete_video("video_id")
Parameters:
| Parameter | Type | Description |
|---|
video_id | str | ID of the video to delete (required) |
Returns: Dict with deletion status
Folders
create_folder()
Create a new folder in a specific scope. Raises an error if a folder with the
same name already exists in the target scope.
marketing = client.create_folder("marketing", description="Q1 campaign")
Parameters:
| Parameter | Type | Default | Description |
|---|
name | str | — | Folder name to create |
scope | 'user' | 'org' | 'user' | Target scope for creation |
description | str | None | None | Optional folder description |
Returns: Dict with folder id, name, org_id, created_at, and description
get_folder()
Lookup a folder by name. Defaults to your personal scope; pass scope="org"
for organization folders.
folder = client.get_folder("fleet_uploads")
org_folder = client.get_folder("fleet_uploads", scope="org")
Parameters:
| Parameter | Type | Default | Description |
|---|
name | str | — | Folder name to look up |
scope | 'user' | 'org' | 'user' | Scope to search within |
Returns: Dict with folder id, name, org_id, scope, created_at, created_by, description, and video_count
Search
search()
Run semantic search across all analysed events inside a folder. You can use open-ended natural language queries.
results = client.search(
query="red pickup truck overtaking",
folder_name="my_fleet_uploads",
scope="org", # optional, defaults to "user"
)
print(results["summary"])
for thought in results["thoughts"]:
print("•", thought)
Required Parameters:
| Parameter | Type | Description |
|---|
query | str | Natural-language search query |
folder_name | str | Human-friendly folder name to search within |
Optional Parameters:
| Parameter | Type | Default | Description |
|---|
scope | 'user' | 'org' | 'sample' | 'user' | Scope hint for folder resolution. Use 'org' for organization folders and 'sample' for demo/sample folders. |
Returns: Dict with:
summary: string overview of the findings
thoughts: list of reasoning steps (chain-of-thought) shown in the UI
matches: list of {video_id, analysis_id, event_index, similarity, reason}
session_id: identifier for the associated search session (useful for re-fetching or sharing)
get_batch_analysis()
Retrieve analysis results for a completed batch. Optionally filter events by approval status (approved, rejected, pending, or invalid), or return event-level CSV.
# Get all results from a batch
batch_results = client.get_batch_analysis("batch_id")
# Filter for only approved events
approved_only = client.get_batch_analysis(
"batch_id",
filter="approved"
)
# Filter for multiple statuses
pending_and_rejected = client.get_batch_analysis(
"batch_id",
filter=["pending", "rejected"]
)
# Return event-level CSV instead of JSON
csv_text = client.get_batch_analysis("batch_id", as_csv=True)
# Filter + CSV (only approved events are included)
approved_csv = client.get_batch_analysis(
"batch_id",
filter="approved",
as_csv=True
)
# Save CSV to disk
with open("batch-results-approved.csv", "w", encoding="utf-8", newline="") as f:
f.write(approved_csv)
# Access batch metadata
print(batch_results["batch_metadata"]["batch_type"])
print(batch_results["batch_metadata"]["batch_viewer_url"])
# Iterate through video results
for result in batch_results["results"]:
print(f"Video: {result['video_id']}")
print(f"Events: {len(result['events'])}")
for event in result["events"]:
print(f" - {event.get('label', '')} at {event.get('t_start', '')}-{event.get('t_end', '')}")
Required Parameters:
| Parameter | Type | Description |
|---|
batch_id | str | ID of the batch to retrieve (required) |
Optional Parameters:
| Parameter | Type | Default | Description |
|---|
filter | str | List[str] | None | Filter events by approval status. Valid values: 'approved', 'rejected', 'pending', 'invalid'. If omitted, returns all events for all videos (including videos with zero events). If provided, only matching events are returned and videos with zero matching events are excluded. |
as_csv | bool | False | If True, returns CSV text instead of JSON. CSV is event-level (one row per event) and does not emit placeholder rows for videos with zero events. |
Returns (as_csv=False, default): Dict with two keys:
-
batch_metadata: Contains batch information
batch_id: The batch identifier
batch_viewer_url: URL to view batch results in the web UI
batch_type: Type of batch ("ask" or "agent")
analysis_type: The analysis type used
review_status: Whether the batch analysis have been fully reviewed by someone
review_status_updated_at: Time the batch analysis review status was updated, N/A if not reviewed yet.
metadata: Dictionary of custom metadata key-value pairs (empty dict if no metadata exists)
- Configuration details (for Ask batches:
prompt, category, etc.)
-
results: List of per-video analysis dictionaries
video_id: ID of the video
analysis_id: ID of the analysis
mode: Analysis mode used
status: Analysis status
events: List of detected events (filtered by approval status if specified)
- Additional fields depending on analysis type
Returns (as_csv=True): str (CSV text)
- Header row is always included.
- Rows are event-level (one row per event).
- Current CSV columns:
Query
Video
Approval Status
Timestamp
Category
Label
AI Analysis
Severity
Video ID
Analysis ID
Batch ID
Batch Viewer URL
Status
Confidence
Annotated Thumbnail URL
Import Source URI
Summary
Raises:
NomadicMLError: If batch is not completed or other API errors occur
ValidationError: If filter contains invalid values
The batch must be completed before you can retrieve its results. If you need to check batch status first, use the batch viewer URL.
Add or update custom metadata for a batch analysis. Metadata is stored as key-value pairs and can be used to track experiments, versions, or any custom information about your batch runs.
# Add metadata to a batch
client.add_batch_metadata(
"batch_id",
{
"experiment_id": "exp-001",
"version": 2,
"model": "Nomadic-VL-XLarge",
"notes": "Test run with new parameters"
}
)
# Update existing metadata (new keys will be merged, existing keys overwritten)
client.add_batch_metadata(
"batch_id",
{
"version": 3,
"status": "completed"
}
)
# Retrieve metadata later
batch_results = client.get_batch_analysis("batch_id")
metadata = batch_results["batch_metadata"]["metadata"]
print(f"Experiment: {metadata.get('experiment_id')}")
print(f"Version: {metadata.get('version')}")
Required Parameters:
| Parameter | Type | Description |
|---|
batch_id | str | ID of the batch to update (required) |
metadata | Dict[str, Union[str, int]] | Dictionary with string keys and string/int values (non-nested) |
Returns: Dict with success status and updated metadata:
success: Boolean indicating if the operation succeeded
batch_id: The batch identifier
metadata: Complete metadata dictionary after merge
Raises:
ValidationError: If metadata format is invalid (e.g., nested objects, non-string keys, invalid value types)
NomadicMLError: If batch is not found or you don’t have permission to modify it
Only the batch owner can add or update metadata. New metadata keys will be merged with existing metadata, with new values overwriting any existing keys with the same name.Metadata values must be strings or integers only - nested objects, arrays, booleans, or null values are not supported.
Thumbnails
get_visuals()
Retrieve thumbnail URLs for all events in an analysis. If thumbnails don’t exist, they will be generated automatically.
# Get all thumbnail URLs from an analysis
visuals = client.get_visuals("video_id", "analysis_id")
# Returns list of thumbnail URLs
# ['https://storage.googleapis.com/.../event_0_thumb.jpg',
# 'https://storage.googleapis.com/.../event_1_thumb.jpg']
Parameters:
| Parameter | Type | Description |
|---|
video_id | str | ID of the video (required) |
analysis_id | str | ID of the analysis containing events (required) |
Returns: List[str] of thumbnail URLs with bounding box annotations
get_visual()
Retrieve a single thumbnail URL for a specific event in an analysis.
# Get thumbnail for the first event (index 0)
thumbnail_url = client.get_visual("video_id", "analysis_id", 0)
# Get thumbnail for the third event (index 2)
thumbnail_url = client.get_visual("video_id", "analysis_id", 2)
Parameters:
| Parameter | Type | Description |
|---|
video_id | str | ID of the video (required) |
analysis_id | str | ID of the analysis containing events (required) |
event_idx | int | 0-based index of the event (required) |
Returns: str - Single thumbnail URL
Raises: ValueError if event index is out of range
For a step-by-step tutorial, head over to SDK Usage Examples.