Working with External Files

Kaggle Colab

In Pixeltable, all media data (videos, images, audio) resides in external files, and Pixeltable stores references to those. The files can be local or remote (e.g., in S3). For the latter, Pixeltable automatically caches the files locally on access.

When interacting with media data via Pixeltable, either through queries or UDFs, the user sees the following Python types:

  • ImageType: PIL.Image.Image
  • VideoType: string (local path)
  • AudioType: string (local path)

Let's create a table and load some data to see what that looks like:

%pip install -qU pixeltable boto3
import tempfile
import random
import shutil
import pixeltable as pxt

# First drop the `external_data` directory if it exists, to ensure
# a clean environment for the demo
pxt.drop_dir('external_data', force=True)
pxt.create_dir('external_data')
Connected to Pixeltable database at: postgresql+psycopg://postgres:@/pixeltable?host=/Users/asiegel/.pixeltable/pgdata
Created directory `external_data`.

<pixeltable.catalog.dir.Dir at 0x176646bb0>
v = pxt.create_table('external_data.videos', {'video': pxt.Video})

prefix = 's3://multimedia-commons/'
paths = [
    'data/videos/mp4/ffe/ffb/ffeffbef41bbc269810b2a1a888de.mp4',
    'data/videos/mp4/ffe/feb/ffefebb41485539f964760e6115fbc44.mp4',
    'data/videos/mp4/ffe/f73/ffef7384d698b5f70d411c696247169.mp4'
]
v.insert({'video': prefix + p} for p in paths)
Created table `videos`.
Computing cells:   0%|                                                    | 0/6 [00:00<?, ? cells/s]
Inserting rows into `videos`: 3 rows [00:00, 1004.62 rows/s]
Computing cells: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 6/6 [00:00<00:00, 79.14 cells/s]
Inserted 3 rows with 0 errors.

UpdateStatus(num_rows=3, num_computed_values=6, num_excs=0, updated_cols=[], cols_with_excs=[])

UpdateStatus(num_rows=3, num_computed_values=0, num_excs=0, updated_cols=[], cols_with_excs=[])

We just inserted 3 rows with video files residing in S3. When we now query these, we are presented with their locally cached counterparts.

(Note: we don't simply display the output of collect() here, because that is formatted as an HTML table with a media player and so would obscure the file path.)

rows = list(v.select(v.video).collect())
rows[0]
{'video': '/Users/asiegel/.pixeltable/file_cache/682f022a704d4459adb2f29f7fe9577c_0_1fcfcb221263cff76a2853250fbbb2e90375dd495454c0007bc6ff4430c9a4a7.mp4'}

Let's make a local copy of the first file and insert that separately. First, the copy:

local_path = tempfile.mktemp(suffix='.mp4')
shutil.copyfile(rows[0]['video'], local_path)
local_path
'/var/folders/hb/qd0dztsj43j_mdb6hbl1gzyc0000gn/T/tmp1jo4a7ca.mp4'

Now the insert:

v.insert(video=local_path)
Computing cells:   0%|                                                    | 0/2 [00:00<?, ? cells/s]
Inserting rows into `videos`: 1 rows [00:00, 725.78 rows/s]
Computing cells: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 2/2 [00:00<00:00, 53.23 cells/s]
Inserted 1 row with 0 errors.

UpdateStatus(num_rows=1, num_computed_values=2, num_excs=0, updated_cols=[], cols_with_excs=[])

When we query this again, we see that local paths are preserved:

rows = list(v.select(v.video).collect())
rows
[{'video': '/Users/asiegel/.pixeltable/file_cache/682f022a704d4459adb2f29f7fe9577c_0_1fcfcb221263cff76a2853250fbbb2e90375dd495454c0007bc6ff4430c9a4a7.mp4'},
 {'video': '/Users/asiegel/.pixeltable/file_cache/682f022a704d4459adb2f29f7fe9577c_0_fc11428b32768ae782193a57ebcbad706f45bbd9fa13354471e0bcd798fee3ea.mp4'},
 {'video': '/Users/asiegel/.pixeltable/file_cache/682f022a704d4459adb2f29f7fe9577c_0_b9fb0d9411bc9cd183a36866911baa7a8834f22f665bce47608566b38485c16a.mp4'},
 {'video': '/var/folders/hb/qd0dztsj43j_mdb6hbl1gzyc0000gn/T/tmp1jo4a7ca.mp4'}]

UDFs also see local paths:

@pxt.udf
def f(v: pxt.Video) -> int:
    print(f'{type(v)}: {v}')
    return 1
v.select(f(v.video)).show()
<class 'str'>: /Users/asiegel/.pixeltable/file_cache/682f022a704d4459adb2f29f7fe9577c_0_1fcfcb221263cff76a2853250fbbb2e90375dd495454c0007bc6ff4430c9a4a7.mp4
<class 'str'>: /Users/asiegel/.pixeltable/file_cache/682f022a704d4459adb2f29f7fe9577c_0_fc11428b32768ae782193a57ebcbad706f45bbd9fa13354471e0bcd798fee3ea.mp4
<class 'str'>: /Users/asiegel/.pixeltable/file_cache/682f022a704d4459adb2f29f7fe9577c_0_b9fb0d9411bc9cd183a36866911baa7a8834f22f665bce47608566b38485c16a.mp4
<class 'str'>: /var/folders/hb/qd0dztsj43j_mdb6hbl1gzyc0000gn/T/tmp1jo4a7ca.mp4
f
1
1
1
1

Dealing with errors

When interacting with media data in Pixeltable, the user can assume that the underlying files exist, are local and are valid for their respective data type. In other words, the user doesn't need to consider error conditions.

To that end, Pixeltable validates media data on ingest. The default behavior is to reject invalid media files:

v.insert(video=prefix + 'bad_path.mp4')
Computing cells:   0%|                                                    | 0/2 [00:01<?, ? cells/s]



---------------------------------------------------------------------------

Error                                     Traceback (most recent call last)

Cell In[9], line 1
----> 1 v.insert(video=prefix + 'bad_path.mp4')


File ~/Dropbox/workspace/pixeltable/pixeltable/pixeltable/catalog/insertable_table.py:125, in InsertableTable.insert(self, rows, print_stats, on_error, **kwargs)
    123         raise excs.Error('rows must be a list of dictionaries')
    124 self._validate_input_rows(rows)
--> 125 status = self._tbl_version.insert(rows, None, print_stats=print_stats, fail_on_exception=fail_on_exception)
    127 if status.num_excs == 0:
    128     cols_with_excs_str = ''


File ~/Dropbox/workspace/pixeltable/pixeltable/pixeltable/catalog/table_version.py:723, in TableVersion.insert(self, rows, df, conn, print_stats, fail_on_exception)
    721 if conn is None:
    722     with Env.get().engine.begin() as conn:
--> 723         return self._insert(
    724             plan, conn, time.time(), print_stats=print_stats, rowids=rowids(), abort_on_exc=fail_on_exception)
    725 else:
    726     return self._insert(
    727         plan, conn, time.time(), print_stats=print_stats, rowids=rowids(), abort_on_exc=fail_on_exception)


File ~/Dropbox/workspace/pixeltable/pixeltable/pixeltable/catalog/table_version.py:737, in TableVersion._insert(self, exec_plan, conn, timestamp, rowids, print_stats, abort_on_exc)
    735 self.version += 1
    736 result = UpdateStatus()
--> 737 num_rows, num_excs, cols_with_excs = self.store_tbl.insert_rows(
    738     exec_plan, conn, v_min=self.version, rowids=rowids, abort_on_exc=abort_on_exc)
    739 result.num_rows = num_rows
    740 result.num_excs = num_excs


File ~/Dropbox/workspace/pixeltable/pixeltable/pixeltable/store.py:323, in StoreBase.insert_rows(self, exec_plan, conn, v_min, show_progress, rowids, abort_on_exc)
    321 try:
    322     exec_plan.open()
--> 323     for row_batch in exec_plan:
    324         num_rows += len(row_batch)
    325         for batch_start_idx in range(0, len(row_batch), self.__INSERT_BATCH_SIZE):
    326             # compute batch of rows and convert them into table rows


File ~/Dropbox/workspace/pixeltable/pixeltable/pixeltable/exec/expr_eval_node.py:45, in ExprEvalNode.__next__(self)
     44 def __next__(self) -> DataRowBatch:
---> 45     input_batch = next(self.input)
     46     # compute target exprs
     47     for cohort in self.cohorts:


File ~/Dropbox/workspace/pixeltable/pixeltable/pixeltable/exec/cache_prefetch_node.py:71, in CachePrefetchNode.__next__(self)
     68     futures[executor.submit(self._fetch_url, row, info.slot_idx)] = (row, info)
     69 for future in concurrent.futures.as_completed(futures):
     70     # TODO:  does this need to deal with recoverable errors (such as retry after throttling)?
---> 71     tmp_path = future.result()
     72     if tmp_path is None:
     73         continue


File /opt/miniconda3/envs/pxt/lib/python3.9/concurrent/futures/_base.py:439, in Future.result(self, timeout)
    437     raise CancelledError()
    438 elif self._state == FINISHED:
--> 439     return self.__get_result()
    441 self._condition.wait(timeout)
    443 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:


File /opt/miniconda3/envs/pxt/lib/python3.9/concurrent/futures/_base.py:391, in Future.__get_result(self)
    389 if self._exception:
    390     try:
--> 391         raise self._exception
    392     finally:
    393         # Break a reference cycle with the exception in self._exception
    394         self = None


File /opt/miniconda3/envs/pxt/lib/python3.9/concurrent/futures/thread.py:58, in _WorkItem.run(self)
     55     return
     57 try:
---> 58     result = self.fn(*self.args, **self.kwargs)
     59 except BaseException as exc:
     60     self.future.set_exception(exc)


File ~/Dropbox/workspace/pixeltable/pixeltable/pixeltable/exec/cache_prefetch_node.py:115, in CachePrefetchNode._fetch_url(self, row, slot_idx)
    113     self.row_builder.set_exc(row, slot_idx, exc)
    114     if not self.ctx.ignore_errors:
--> 115         raise exc from None  # suppress original exception
    116 return None


Error: Failed to download s3://multimedia-commons/bad_path.mp4: An error occurred (404) when calling the HeadObject operation: Not Found

The same happens for corrupted files:

# create invalid .mp4
with tempfile.NamedTemporaryFile(mode='wb', suffix='.mp4', delete=False) as temp_file:
    temp_file.write(random.randbytes(1024))
    corrupted_path = temp_file.name

v.insert(video=corrupted_path)
Computing cells: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 2/2 [00:00<00:00, 1084.64 cells/s]



---------------------------------------------------------------------------

Error                                     Traceback (most recent call last)

Cell In[10], line 6
      3     temp_file.write(random.randbytes(1024))
      4     corrupted_path = temp_file.name
----> 6 v.insert(video=corrupted_path)


File ~/Dropbox/workspace/pixeltable/pixeltable/pixeltable/catalog/insertable_table.py:125, in InsertableTable.insert(self, rows, print_stats, on_error, **kwargs)
    123         raise excs.Error('rows must be a list of dictionaries')
    124 self._validate_input_rows(rows)
--> 125 status = self._tbl_version.insert(rows, None, print_stats=print_stats, fail_on_exception=fail_on_exception)
    127 if status.num_excs == 0:
    128     cols_with_excs_str = ''


File ~/Dropbox/workspace/pixeltable/pixeltable/pixeltable/catalog/table_version.py:723, in TableVersion.insert(self, rows, df, conn, print_stats, fail_on_exception)
    721 if conn is None:
    722     with Env.get().engine.begin() as conn:
--> 723         return self._insert(
    724             plan, conn, time.time(), print_stats=print_stats, rowids=rowids(), abort_on_exc=fail_on_exception)
    725 else:
    726     return self._insert(
    727         plan, conn, time.time(), print_stats=print_stats, rowids=rowids(), abort_on_exc=fail_on_exception)


File ~/Dropbox/workspace/pixeltable/pixeltable/pixeltable/catalog/table_version.py:737, in TableVersion._insert(self, exec_plan, conn, timestamp, rowids, print_stats, abort_on_exc)
    735 self.version += 1
    736 result = UpdateStatus()
--> 737 num_rows, num_excs, cols_with_excs = self.store_tbl.insert_rows(
    738     exec_plan, conn, v_min=self.version, rowids=rowids, abort_on_exc=abort_on_exc)
    739 result.num_rows = num_rows
    740 result.num_excs = num_excs


File ~/Dropbox/workspace/pixeltable/pixeltable/pixeltable/store.py:334, in StoreBase.insert_rows(self, exec_plan, conn, v_min, show_progress, rowids, abort_on_exc)
    332 if abort_on_exc and row.has_exc():
    333     exc = row.get_first_exc()
--> 334     raise exc
    336 rowid = (next(rowids),) if rowids is not None else row.pk[:-1]
    337 pk = rowid + (v_min,)


File ~/Dropbox/workspace/pixeltable/pixeltable/pixeltable/exprs/column_ref.py:159, in ColumnRef.eval(self, data_row, row_builder)
    156     return
    158 try:
--> 159     self.col.col_type.validate_media(data_row.file_paths[unvalidated_slot_idx])
    160     # access the value only after successful validation
    161     val = data_row[unvalidated_slot_idx]


File ~/Dropbox/workspace/pixeltable/pixeltable/pixeltable/type_system.py:906, in VideoType.validate_media(self, val)
    904             raise excs.Error(f'Not a valid video: {val}')
    905 except av.AVError:
--> 906     raise excs.Error(f'Not a valid video: {val}') from None


Error: Not a valid video: /var/folders/hb/qd0dztsj43j_mdb6hbl1gzyc0000gn/T/tmp3djgfyjp.mp4

Alternatively, Pixeltable can also be instructed to record error conditions and proceed with the ingest, via the on_error flag (default: 'abort'):

v.insert([{'video': prefix + 'bad_path.mp4'}, {'video': corrupted_path}], on_error='ignore')
Computing cells: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 4/4 [00:00<00:00, 20.98 cells/s]
Inserting rows into `videos`: 2 rows [00:00, 671.63 rows/s]
Computing cells: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 4/4 [00:00<00:00, 20.13 cells/s]
Inserted 2 rows with 4 errors across 2 columns (videos.video, videos.None).

UpdateStatus(num_rows=2, num_computed_values=4, num_excs=4, updated_cols=[], cols_with_excs=['videos.video', 'videos.None'])

Every media column has properties errortype and errormsg (both containing string data) that indicate whether the column value is valid. Invalid values show up as None and have non-null errortype/errormsg:

v.select(v.video == None, v.video.errortype, v.video.errormsg).collect()
col_0 video_errortype video_errormsg
False None None
False None None
False None None
False None None
True Error Failed to download s3://multimedia-commons/bad_path.mp4: An error occurred (404) when calling the HeadObject operation: Not Found
True Error Not a valid video: /var/folders/hb/qd0dztsj43j_mdb6hbl1gzyc0000gn/T/tmp3djgfyjp.mp4

Errors can now be inspected (and corrected) after the ingest:

v.where(v.video.errortype != None).select(v.video.errormsg).collect()
video_errormsg
Failed to download s3://multimedia-commons/bad_path.mp4: An error occurred (404) when calling the HeadObject operation: Not Found
Not a valid video: /var/folders/hb/qd0dztsj43j_mdb6hbl1gzyc0000gn/T/tmp3djgfyjp.mp4

Accessing the original file paths

In some cases, it will be necessary to access file paths (not, say, the PIL.Image.Image), and Pixeltable provides the column properties fileurl and localpath for that purpose:

v.select(v.video.fileurl, v.video.localpath).collect()
video_fileurl video_localpath
s3://multimedia-commons/data/videos/mp4/ffe/ffb/ffeffbef41bbc269810b2a1a888de.mp4 /Users/asiegel/.pixeltable/file_cache/682f022a704d4459adb2f29f7fe9577c_0_1fcfcb221263cff76a2853250fbbb2e90375dd495454c0007bc6ff4430c9a4a7.mp4
s3://multimedia-commons/data/videos/mp4/ffe/feb/ffefebb41485539f964760e6115fbc44.mp4 /Users/asiegel/.pixeltable/file_cache/682f022a704d4459adb2f29f7fe9577c_0_fc11428b32768ae782193a57ebcbad706f45bbd9fa13354471e0bcd798fee3ea.mp4
s3://multimedia-commons/data/videos/mp4/ffe/f73/ffef7384d698b5f70d411c696247169.mp4 /Users/asiegel/.pixeltable/file_cache/682f022a704d4459adb2f29f7fe9577c_0_b9fb0d9411bc9cd183a36866911baa7a8834f22f665bce47608566b38485c16a.mp4
file:///var/folders/hb/qd0dztsj43j_mdb6hbl1gzyc0000gn/T/tmp1jo4a7ca.mp4 /var/folders/hb/qd0dztsj43j_mdb6hbl1gzyc0000gn/T/tmp1jo4a7ca.mp4
None None
None None

Note that for local media files, the fileurl property still returns a parsable URL.