Working with External Files
Working with External Files¶
In Pixeltable, all media data (videos, images, audio) resides in external files, and Pixeltable stores references to those. The files can be local or remote (e.g., in S3). For the latter, Pixeltable automatically caches the files locally on access.
When interacting with media data via Pixeltable, either through queries or UDFs, the user sees the following Python types:
ImageType
:PIL.Image.Image
VideoType
:string
(local path)AudioType
:string
(local path)
Let's create a table and load some data to see what that looks like:
import tempfile
import random
import shutil
import pixeltable as pxt
pxt.drop_dir('external_data', force=True) # Ensure a clean slate for the tutorial
pxt.create_dir('external_data')
Connected to Pixeltable database at: postgresql://postgres:@/pixeltable?host=/Users/orm/.pixeltable/pgdata
pxt.drop_table('external_data.videos', ignore_errors=True)
v = pxt.create_table('external_data.videos', {'video': pxt.VideoType()})
prefix = 's3://multimedia-commons/'
paths = [
'data/videos/mp4/ffe/ffb/ffeffbef41bbc269810b2a1a888de.mp4',
'data/videos/mp4/ffe/feb/ffefebb41485539f964760e6115fbc44.mp4',
'data/videos/mp4/ffe/f73/ffef7384d698b5f70d411c696247169.mp4'
]
v.insert({'video': prefix + p} for p in paths)
Created table `videos`.
Inserting rows into `videos`: 0 rows [00:00, ? rows/s]
Inserting rows into `videos`: 3 rows [00:00, 897.69 rows/s]
Inserted 3 rows with 0 errors.
UpdateStatus(num_rows=3, num_computed_values=0, num_excs=0, updated_cols=[], cols_with_excs=[])
We just inserted 3 rows with video files residing in S3. When we now query these, we are presented with their locally cached counterparts.
(Note: we don't simply display the output of collect()
here, because that is formatted as an HTML table with a media player and so would obscure the file path.)
rows = list(v.select(v.video).collect())
rows[0]
{'video': '/Users/orm/.pixeltable/file_cache/970801cdb9664e309e1bf50429103896_0_1fcfcb221263cff76a2853250fbbb2e90375dd495454c0007bc6ff4430c9a4a7.mp4'}
Let's make a local copy of the first file and insert that separately. First, the copy:
local_path = tempfile.mktemp(suffix='.mp4')
shutil.copyfile(rows[0]['video'], local_path)
local_path
'/var/folders/8v/d886z5j13dsctyjpw29t7y480000gn/T/tmpmeecpn92.mp4'
Now the insert:
v.insert(video=local_path)
Inserting rows into `videos`: 0 rows [00:00, ? rows/s]
Inserting rows into `videos`: 1 rows [00:00, 1396.70 rows/s]
Inserted 1 row with 0 errors.
UpdateStatus(num_rows=1, num_computed_values=0, num_excs=0, updated_cols=[], cols_with_excs=[])
When we query this again, we see that local paths are preserved:
rows = list(v.select(v.video).collect())
rows
[{'video': '/Users/orm/.pixeltable/file_cache/970801cdb9664e309e1bf50429103896_0_1fcfcb221263cff76a2853250fbbb2e90375dd495454c0007bc6ff4430c9a4a7.mp4'}, {'video': '/Users/orm/.pixeltable/file_cache/970801cdb9664e309e1bf50429103896_0_fc11428b32768ae782193a57ebcbad706f45bbd9fa13354471e0bcd798fee3ea.mp4'}, {'video': '/Users/orm/.pixeltable/file_cache/970801cdb9664e309e1bf50429103896_0_b9fb0d9411bc9cd183a36866911baa7a8834f22f665bce47608566b38485c16a.mp4'}, {'video': '/var/folders/8v/d886z5j13dsctyjpw29t7y480000gn/T/tmpmeecpn92.mp4'}]
UDFs also see local paths:
@pxt.udf(param_types=[pxt.VideoType()])
def f(v: str) -> int:
print(f'{type(v)}: {v}')
return 1
v.select(f(v.video)).show()
<class 'str'>: /Users/orm/.pixeltable/file_cache/970801cdb9664e309e1bf50429103896_0_1fcfcb221263cff76a2853250fbbb2e90375dd495454c0007bc6ff4430c9a4a7.mp4 <class 'str'>: /Users/orm/.pixeltable/file_cache/970801cdb9664e309e1bf50429103896_0_fc11428b32768ae782193a57ebcbad706f45bbd9fa13354471e0bcd798fee3ea.mp4 <class 'str'>: /Users/orm/.pixeltable/file_cache/970801cdb9664e309e1bf50429103896_0_b9fb0d9411bc9cd183a36866911baa7a8834f22f665bce47608566b38485c16a.mp4 <class 'str'>: /var/folders/8v/d886z5j13dsctyjpw29t7y480000gn/T/tmpmeecpn92.mp4
col_0 |
---|
1 |
1 |
1 |
1 |
Dealing with errors¶
When interacting with media data in Pixeltable, the user can assume that the underlying files exist, are local and are valid for their respective data type. In other words, the user doesn't need to consider error conditions.
To that end, Pixeltable validates media data on ingest. The default behavior is to reject invalid media files:
v.insert(video=prefix + 'bad_path.mp4')
--------------------------------------------------------------------------- Error Traceback (most recent call last) Cell In[9], line 1 ----> 1 v.insert(video=prefix + 'bad_path.mp4') File ~/repos/pixeltable/pixeltable/catalog/insertable_table.py:134, in InsertableTable.insert(self, rows, print_stats, fail_on_exception, **kwargs) 132 raise excs.Error('rows must be a list of dictionaries') 133 self._validate_input_rows(rows) --> 134 result = self.tbl_version.insert(rows, print_stats=print_stats, fail_on_exception=fail_on_exception) 136 if result.num_excs == 0: 137 cols_with_excs_str = '' File ~/repos/pixeltable/pixeltable/catalog/table_version.py:600, in TableVersion.insert(self, rows, print_stats, fail_on_exception) 598 ts = time.time() 599 with Env.get().engine.begin() as conn: --> 600 return self._insert(plan, conn, ts, print_stats) File ~/repos/pixeltable/pixeltable/catalog/table_version.py:609, in TableVersion._insert(self, exec_plan, conn, ts, print_stats) 607 self.version += 1 608 result = UpdateStatus() --> 609 num_rows, num_excs, cols_with_excs = self.store_tbl.insert_rows(exec_plan, conn, v_min=self.version) 610 self.next_rowid = num_rows 611 result.num_rows = num_rows File ~/repos/pixeltable/pixeltable/store.py:285, in StoreBase.insert_rows(self, exec_plan, conn, v_min) 283 try: 284 exec_plan.open() --> 285 for row_batch in exec_plan: 286 num_rows += len(row_batch) 287 for batch_start_idx in range(0, len(row_batch), batch_size): 288 # compute batch of rows and convert them into table rows File ~/repos/pixeltable/pixeltable/exec/media_validation_node.py:26, in MediaValidationNode.__next__(self) 24 def __next__(self) -> DataRowBatch: 25 assert self.input is not None ---> 26 row_batch = next(self.input) 27 for row in row_batch: 28 for slot_idx, col in [(c.slot_idx, c.col) for c in self.media_slots]: File ~/repos/pixeltable/pixeltable/exec/cache_prefetch_node.py:70, in CachePrefetchNode.__next__(self) 67 futures[executor.submit(self._fetch_url, row, info.slot_idx)] = (row, info) 68 for future in concurrent.futures.as_completed(futures): 69 # TODO: does this need to deal with recoverable errors (such as retry after throttling)? ---> 70 tmp_path = future.result() 71 if tmp_path is None: 72 continue File ~/mambaforge/envs/pixeltable_39/lib/python3.9/concurrent/futures/_base.py:439, in Future.result(self, timeout) 437 raise CancelledError() 438 elif self._state == FINISHED: --> 439 return self.__get_result() 441 self._condition.wait(timeout) 443 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: File ~/mambaforge/envs/pixeltable_39/lib/python3.9/concurrent/futures/_base.py:391, in Future.__get_result(self) 389 if self._exception: 390 try: --> 391 raise self._exception 392 finally: 393 # Break a reference cycle with the exception in self._exception 394 self = None File ~/mambaforge/envs/pixeltable_39/lib/python3.9/concurrent/futures/thread.py:58, in _WorkItem.run(self) 55 return 57 try: ---> 58 result = self.fn(*self.args, **self.kwargs) 59 except BaseException as exc: 60 self.future.set_exception(exc) File ~/repos/pixeltable/pixeltable/exec/cache_prefetch_node.py:114, in CachePrefetchNode._fetch_url(self, row, slot_idx) 112 self.row_builder.set_exc(row, slot_idx, exc) 113 if not self.ctx.ignore_errors: --> 114 raise exc from None # suppress original exception 115 return None Error: Failed to download s3://multimedia-commons/bad_path.mp4: An error occurred (404) when calling the HeadObject operation: Not Found
The same happens for corrupted files:
# create invalid .mp4
with tempfile.NamedTemporaryFile(mode='wb', suffix='.mp4', delete=False) as temp_file:
temp_file.write(random.randbytes(1024))
corrupted_path = temp_file.name
v.insert(video=corrupted_path)
--------------------------------------------------------------------------- Error Traceback (most recent call last) Cell In[10], line 6 3 temp_file.write(random.randbytes(1024)) 4 corrupted_path = temp_file.name ----> 6 v.insert(video=corrupted_path) File ~/repos/pixeltable/pixeltable/catalog/insertable_table.py:134, in InsertableTable.insert(self, rows, print_stats, fail_on_exception, **kwargs) 132 raise excs.Error('rows must be a list of dictionaries') 133 self._validate_input_rows(rows) --> 134 result = self.tbl_version.insert(rows, print_stats=print_stats, fail_on_exception=fail_on_exception) 136 if result.num_excs == 0: 137 cols_with_excs_str = '' File ~/repos/pixeltable/pixeltable/catalog/table_version.py:600, in TableVersion.insert(self, rows, print_stats, fail_on_exception) 598 ts = time.time() 599 with Env.get().engine.begin() as conn: --> 600 return self._insert(plan, conn, ts, print_stats) File ~/repos/pixeltable/pixeltable/catalog/table_version.py:609, in TableVersion._insert(self, exec_plan, conn, ts, print_stats) 607 self.version += 1 608 result = UpdateStatus() --> 609 num_rows, num_excs, cols_with_excs = self.store_tbl.insert_rows(exec_plan, conn, v_min=self.version) 610 self.next_rowid = num_rows 611 result.num_rows = num_rows File ~/repos/pixeltable/pixeltable/store.py:285, in StoreBase.insert_rows(self, exec_plan, conn, v_min) 283 try: 284 exec_plan.open() --> 285 for row_batch in exec_plan: 286 num_rows += len(row_batch) 287 for batch_start_idx in range(0, len(row_batch), batch_size): 288 # compute batch of rows and convert them into table rows File ~/repos/pixeltable/pixeltable/exec/media_validation_node.py:41, in MediaValidationNode.__next__(self) 39 self.row_builder.set_exc(row, slot_idx, exc) 40 if not self.ctx.ignore_errors: ---> 41 raise exc 43 return row_batch File ~/repos/pixeltable/pixeltable/exec/media_validation_node.py:37, in MediaValidationNode.__next__(self) 34 continue 36 try: ---> 37 col.col_type.validate_media(path) 38 except excs.Error as exc: 39 self.row_builder.set_exc(row, slot_idx, exc) File ~/repos/pixeltable/pixeltable/type_system.py:783, in VideoType.validate_media(self, val) 781 raise excs.Error(f'Not a valid video: {val}') 782 except av.AVError: --> 783 raise excs.Error(f'Not a valid video: {val}') from None Error: Not a valid video: /var/folders/8v/d886z5j13dsctyjpw29t7y480000gn/T/tmpis_08o50.mp4
Alternatively, Pixeltable can also be instructed to record error conditions and proceed with the ingest, via the fail_on_exception
flag (default: True
):
v.insert([{'video': prefix + 'bad_path.mp4'}, {'video': corrupted_path}], fail_on_exception=False)
Inserting rows into `videos`: 0 rows [00:00, ? rows/s]
Inserting rows into `videos`: 2 rows [00:00, 621.89 rows/s]
Inserted 2 rows with 2 errors across 1 column (videos.video).
UpdateStatus(num_rows=2, num_computed_values=0, num_excs=2, updated_cols=[], cols_with_excs=['videos.video'])
Every media column has properties errortype
and errormsg
(both containing string
data) that indicate whether the column value is valid. Invalid values show up as None
and have non-null errortype
/errormsg
:
v.select(v.video == None, v.video.errortype, v.video.errormsg).collect()
col_0 | video_errortype | video_errormsg |
---|---|---|
False | None | None |
False | None | None |
False | None | None |
False | None | None |
True | Error | Failed to download s3://multimedia-commons/bad_path.mp4: An error occurred (404) when calling the HeadObject operation: Not Found |
True | Error | Not a valid video: /var/folders/8v/d886z5j13dsctyjpw29t7y480000gn/T/tmpis_08o50.mp4 |
Errors can now be inspected (and corrected) after the ingest:
v.where(v.video.errortype != None).select(v.video.errormsg).collect()
video_errormsg |
---|
Failed to download s3://multimedia-commons/bad_path.mp4: An error occurred (404) when calling the HeadObject operation: Not Found |
Not a valid video: /var/folders/8v/d886z5j13dsctyjpw29t7y480000gn/T/tmpis_08o50.mp4 |
Accessing the original file paths¶
In some cases, it will be necessary to access file paths (not, say, the PIL.Image.Image
), and Pixeltable provides the column properties fileurl
and localpath
for that purpose:
v.select(v.video.fileurl, v.video.localpath).collect()
video_fileurl | video_localpath |
---|---|
s3://multimedia-commons/data/videos/mp4/ffe/ffb/ffeffbef41bbc269810b2a1a888de.mp4 | /Users/orm/.pixeltable/file_cache/970801cdb9664e309e1bf50429103896_0_1fcfcb221263cff76a2853250fbbb2e90375dd495454c0007bc6ff4430c9a4a7.mp4 |
s3://multimedia-commons/data/videos/mp4/ffe/feb/ffefebb41485539f964760e6115fbc44.mp4 | /Users/orm/.pixeltable/file_cache/970801cdb9664e309e1bf50429103896_0_fc11428b32768ae782193a57ebcbad706f45bbd9fa13354471e0bcd798fee3ea.mp4 |
s3://multimedia-commons/data/videos/mp4/ffe/f73/ffef7384d698b5f70d411c696247169.mp4 | /Users/orm/.pixeltable/file_cache/970801cdb9664e309e1bf50429103896_0_b9fb0d9411bc9cd183a36866911baa7a8834f22f665bce47608566b38485c16a.mp4 |
file:///var/folders/8v/d886z5j13dsctyjpw29t7y480000gn/T/tmpmeecpn92.mp4 | /var/folders/8v/d886z5j13dsctyjpw29t7y480000gn/T/tmpmeecpn92.mp4 |
None | None |
None | None |
Note that for local media files, the fileurl
property still returns a parsable URL.
Updated 2 months ago