pytubes API

Tube Base Class

class tubes.Tube

The base class for all tubes. Pipelines are typically built by calling the methods defines on this class.

chunk(self, size_t num)

Compatible input types: Null, Bool, Int64, float, bytes, str, object, Json

Recursively look at the inputs to find an Each() tube. If a single Each() tube with a list/tuple contents is found, split the value into num sized chunks, and chain them together.

This is a bit of a hack to support some tricky use-cases (for example reading very large gzipped files requires treating every file as a different gzip stream, so calling .chunk(1) allows this to work)

It’s also an experiment to see how multi-threading may work in the future

>>> list(Each(["file.gz", "file2.gz"]).read_files().gunzip(stream=True))
Traceback (most recent call last):
...
ValueError: Trailing data in gzip stream
>>> list(Each(["file.gz", "file2.gz"]).read_files().gunzip(stream=True).chunk(1))
["file1 contents", "file2 contents"]
csv(self, headers=True, sep=u', ', split=True, skip_empty_rows=True)

Compatible input types: bytes, object

Interpret the input values as rows of a CSV file. Each input to csv() is treated as a separate row in the file.

Parameters:
  • headers (bool) – [default: True] If true, will read the first input value as a tab-separated list of field names, allowing subsequent access to values by name, as well as by index.
  • sep (str) – [default: ‘,’] A single-character string that is used as the field separator when reading rows.
  • split (bool) – [default: True] If True, split the input bytes on newlines, to identify rows. If False, each input value is assumed to be a separate row.
  • skip_empty_rows (bool) – Skip over blank rows in the in input (note whitespace causes a row to be considered non-blank)
>>> list(Each(['sample.csv']).read_files().csv())
[(b'abc', b'def'), (b'ghi', b'jkl')]
>>> list(Each(['a,b', 'c,d']).csv())
[(b'a', b'b'), (b'c', b'd')]
>>> list(Each(['a,b', 'c,d']).csv(headers=True).get('a'))
[b'c']
>>> list(Each(['a,b', 'c,d']).csv(headers=False).get(1).to(str))
['b', 'd']
enum(self, codec=u'utf-8')

Compatible input types: Null, Bool, Int64, float, bytes, str

Convert the input to a python object, storing and returning duplicate values where possible to reduce allocation overhead

enumerate(self, start=0)

Compatible input types: Null, Bool, Int64, float, bytes, str, object, Json

Similar to the python builtin :func:enumerate function, prefix the tube’s dtype with an Int64 counter, starting from start

>>> list(Each(['a', 'b', 'c']).enumerate())
[(0, 'a'), (1, 'b'), (2, 'c')]
>>> list(Each(['a', 'b', 'c']).enumerate(10))
[(10, 'a'), (11, 'b'), (12, 'c')]
equals(self, value)

Compatible input types: Null, Bool, Int64, float, bytes, str, object

Compare the values in parent against a static value, and output a bool tube with the result of that comparison.

>>> list(Each(['apple', 'banana', 'cloud']).to(str).equals('banana'))
[False, True, False]
>>> list(Each([False, 0, '']).equals(False))
[True, True, False]
first(self, size_t num)

Compatible input types: Null, Bool, Int64, float, bytes, str, object, Json

Create a tube that yields no more than the first num items from its parent.

>>>  list(Each([1,2,3,4,5]).first(1))
[1]
>>>  list(Each([1,2,3,4,5]).first(10))
[1, 2, 3, 4, 5]
get(self, key, default=UNDEFINED, codec=u'utf-8')

Compatible input types: Null, Bool, Int64, float, bytes, str, object, Json

Efficiently read the field key from the input object and return it’s value.

If field is missing and a default is provided, return default, otherwise raise.

default must be a valid value for the input type. For example, if the input is a Json dtype, default must be a string/bytes value that is valid JSON (e.g. 'null')

>>> list(Each(['{"a": 1}', '{"a": 2}']).json().get("a"))
[1, 2]
>>> list(Each(['{"a": 1}', '{"b": 2}']).json().get("a"))
Traceback (most recent call last):
...
KeyError: Field not found
>>> list(Each(['{"a": 1}', '{"b": 2}']).json().get("a", "null"))
[1, None]
group_id(self)

Compatible input types: Bool, Int64, float, bytes, str

Assuming the input is sorted, for each unique value in the input, return a unique integer ID for that value. If the input isn’t sorted, return a value that increments every time the input changes.

>>> list(Each(['a', 'a', 'b', 'b', 'c', 'd']).group_id())
[0, 0, 1, 1, 2, 3]
gt(self, value)

Compatible input types: Null, Bool, Int64, float, object

Return a bool tube, that is True if the input is greater than value, otherwise False

>>> list(Count().skip_unless(lambda x: x.gt(4)).first(2))
[5, 6]
gunzip(self, stream=False)

Compatible input types: Null, Bool, Int64, float, bytes, str, object, Json

zlib/gzip decompress the input bytes, returning a view of up to 16 megabytes of decompressed data.

By default, this assumes that each input slice is an entire stream (i.e. from map_files). Setting stream=True treats all input values as part of a single gzip stream.

>>> list(Each(["file.gz", "file2.gz"]).map_files().gunzip())
["file1 contents", "file2 contents"]
>>> list(Each(["file.gz", "file2.gz"]).read_files().gunzip(stream=True).chunk(1))
["file1 contents", "file2 contents"]
is_blank(self)

Compatible input types: bytes, str

Returns true if the length of the input is 0, otherwise false.

>>> list(Each(['', 'b', 'ba', 'ban']).to(str).is_blank())
[True, False, False, False]
json(self)

Compatible input types: bytes, str, object

Interpret the input as JSON documents. The JSON parser is not validating, and will accept invalid JSON.

>>> list(Each(['1', '{}', '[]', 'null']).to(tubes.Utf8).json())
[1, {}, [], None]
>>> list(Each(['neil']).to(tubes.Utf8).json())
[None]
len(self)

Compatible input types: bytes, str

Return the length of the bytes/string value. For bytes types, this returns the byte length of the value, for str types, the number of characters is returned.

Note, technically, the length returned for str types is the count of unicode codepoints. This is not always equal to the number of visual characters, especially when, for example, modifiers are used. For example: 👋🏽 _may_ appear as a single symbol, but is counted as two characters here.

>>> list(Each(['', 'b', 'ba', 'ban']).to(str).len())
[0, 1, 2, 3]
>>> list(Each(['£', '👋🏽']).to(str).len()) 
[1, 2]
>>> list(Each(['£', '👋🏽']).to(bytes).len())
[2, 8]
lt(self, value)

Compatible input types: Null, Bool, Int64, float, object

Return a bool tube, that is True if the input is less than value, otherwise False

>>> list(Count().skip_unless(lambda x: x.lt(4)).first(100))
[0, 1, 2, 3]
map_files(self)

Compatible Dtypes: bytes, Utf8

For each filename from the input, open the file and mmap it to a byte slice.

multi(self, *makers)

Compatible input types: Null, Bool, Int64, float, bytes, str, object, Json

Perform multiple operations on a tube, and return a tube with n slots

makers should be a callable that takes the input tube, and returns a tuple of tubes derived from the input.

>>> list(Count().multi(lambda x: (x.lt(1), x.equals(1), x.gt(1))).first(3))
[(True, False, False), (False, True, False), (False, False, True)]
ndarray(self, *slot_info, estimated_rows=32768, fields=None)

Create a new numpy ndarray with appropriate numpy dtype, and fill it with the results of the tube.

Parameters:
  • estimated_rows (int) – A hint to pytubes as to how many rows are expected in the completed array. This affects the way in which the ndarray is resized during iteration.
  • fields – If True, the created numpy array is 1-dimentional, using structured types. Fields names are string slot numbers. If False, all slots must have an identical type, produces an n-dimentional array, with each slot in a different dimension.
  • slot_info – Provide metadata about each slot to help create the ndarray Currently required by bytes types to set the column size. The n-th slot_info value is used to affect the numpy dtype of the n-th slot of the input.
>>> Each(['abcd', 'efgh']).to(bytes).ndarray(2)
array([b'ab', b'ef'], dtype='|S3')
>>> Each(['abcd', 'efgh']).to(bytes).ndarray(4)
array([b'abcd', b'efgh'], dtype='|S5')
one

Return just the first value from the tube, useful as a debugging aid

read_fileobj(self, size=8388608)

Compatible Dtypes: object

Each item in the input must be a binary file obj.

Returns an iterator that reads the contents of each file obj in size sized chunks and returns it.

read_files(self)

Compatible Dtypes: bytes, Utf8

For each filename from the input, open the file and read successive chunks of data from it. Each new file will start at the beginning of a slice, but a single file may result in multiple slices.

Care must be taken when using this with iterators that have file-level context (for example gunzip()) as file boundaries are not communicated.

chunk(1) may be used to process files in isolation before compbining the iterators.

>>> list(Each(['file1.txt']).read_files().split())
['file 1, line 1', 'file 1, line 2']
>>> list(Each(['file1.txt', 'file2.txt']).read_files().split().skip(1).chunk(1))
['file 1 line 2', 'file 1 line 3', 'file 2 line 2', ...]
skip(self, size_t num)

Compatible input types: Null, Bool, Int64, float, bytes, str, object, Json

Create a tube that skips the first num items, yielding any furthe times.

>>>  list(Each([1,2,3,4,5]).skip(1))
[2, 3, 4, 5]
>>>  list(Each([1,2,3,4,5]).skip(10))
[]
skip_if(self, conditional)

Compatible input types: Bool, Int64, float, bytes, str, object

conditional must either be a pre-made bool tube, or a callable that takes a single tube argument (the parent), and returns a bool tube.

Iterates over conditional and the parent together, yielding values only where the result of conditional is False.

Stops only when either the input __or__ conditional raise StopIteration. This can be sligtly unexpected in, for example, this case: Count().skip_if(lambda x: x.gt(2)) in which case, the result is [0, 1, 2], but iteration over the tube will never complete because skip_if isn’t clever enough to work out that the condition tube will never return another True. In this case, an explicit .first(n) will limit the run time.

>>> list(Count().skip_if(lambda x: x.lt(5)).first(2))
[5, 6]
>>> list(Count().skip_if(Each([False, True, False]).to(bool)))
[0, 2]
skip_unless(self, conditional)

Compatible input types: Bool, Int64, float, bytes, str, object

conditional must either be a pre-made bool tube, or a callable that takes a single tube argument (the parent), and returns a bool tube.

Iterates over conditional and the parent together, yielding values only where the result of conditional is True.

Stops only when either the input __or__ conditional raise StopIteration. This can be sligtly unexpected in, for example, this case: Count().skip_unless(lambda x: x.lt(3)) in which case, the result is [0, 1, 2], but iteration over the tube will never complete because skip_unless isn’t clever enough to work out that the condition tube will never return another True. In this case, an explicit .first(n) will limit the run time.

>>> list(Count().skip_unless(lambda x: x.gt(4)).first(2))
[5, 6]
>>> list(Count().skip_unless(Each([False, True, False]).to(bool)))
[1]
slot(self, size_t num, default=UNDEFINED)

Compatible input types: Null, Bool, Int64, float, bytes, str, object, Json

Return slot number num from the parent iter. This is only useful when dealing with multi-slot tubes.

If the value in the slot can be considered undefined (for example json missing value) then default is returned if provided, otherwise KeyError is raised.

split(self, sep=u'n', trim=u'', skip_empty=False)

Compatible input types: bytes, str

Split the input view on the character sep. This behaves similarly to the python str.split() function.

This iterator will typically produce multiple values for each input value, as each split produces a new output value.

>>> list(Each(['a.b.c', 'd.e']).split("."))
['a', 'b', 'c', 'd', 'e']
>>> list(Each(['ab\ncd', 'ef\ngh']).split())
['ab', 'cd', 'ef', 'gh']
to(self, *types, codec=u'utf-8')

Convert the input to the specified dtype.

Supported conversions:

To→ Null Bool Int64 float bytes str object Json
↓From
Null            
Bool    
Int64        
float        
bytes      
str        
object
Json        
to_py(self)

Compatible input types: Null, Bool, Int64, float, bytes, str, object, Json

Convert a tube of any type to python objects.

This is not typically needed, as calling __iter__ on a tube implicitly converts values to python

to_pyarrow(self, fields)

Return a new pyarrow Array or Table, containing the results of the tube.

Only available if the pyarrow module is available.

Parameters:fields – The names of the Table columns
>>> tubes.Count(1).first(10).to_pyarrow(['val']).to_pandas()
   val
0    1
1    2
...
>>> tubes.Count(1)\
        .first(10)\
        .multi(lambda x: (
            x,
            x.to(float),
            x.gt(5)
        )).to_pyarrow(['val', 'val float', 'is_big']).to_pandas()
val  val float  is_big
0    1        1.0   False
1    2        2.0   False
...
tsv(self, headers=True, sep=u't', split=True, skip_empty_rows=True)

Compatible input types: bytes, object

Interpret the input values as rows of a TSV file.

Parameters:
  • headers (bool) – [default: True] If True, will read the first input value as a tab-separated list of field names, allowing subsequent access to values by name, as well as by index.
  • sep (str) – [default: ‘ ‘] A single-character string that is used as the field separator when reading rows.
  • split (bool) – [default: True] If True, split the input bytes on newlines, to identify rows. If False, each input value is assumed to be a separate row.
  • skip_empty_rows (bool) – Skip over blank rows in the in input (note whitespace causes a row to be considered non-blank)
>>> list(Each(['sample.tsv']).read_files().tsv())
[(b'abc', b'def'), (b'ghi', b'jkl')]
>>> list(Each(['a\tb', 'c\td']).tsv())
[(b'a', b'b'), (b'c', b'd')]
>>> list(Each(['a\tb', 'c\td']).tsv(headers=True).get('a'))
[b'c']
>>> list(Each(['a|b', 'c|d']).tsv(headers=False, sep='|').get(1).to(str))
['b', 'd']
zip(self, other)

Combine two inputs into one, by joining their dtypes, and iterating both together.

>>> first = Each(['a', 'b', 'c']).to(str)
>>> second = Each([1, 2, 3]).to(int)
>>> (first.dtype, second.dtype)
((DType[Utf8],), (DType[Int64],))
>>> first.zip(second).dtype
(DType[Utf8], DType[Int64])
>>>  list(first.zip(second))
[('a', 1), ('b', 2), ('c', 3)]

Other Classes

class tubes.Each

Iterate over the provided python object, as an input to a tube. Takes one argument, which should either be a python iterator/generator, or an iterable.

>>> list(Each([1, 2, 3]))
[1, 2, 3]
>>> list(Each(itertools.count()).first(5))
[0, 1, 2, 3, 4]
>>> list(Each(i*2 for i in range(5)))
[0, 2, 4, 6, 8]
class tubes.Count

Iterator that behaves similarly to itertools.count().

Takes an optional numeric argument start that sets the first number returned by Count() [default:0]

>>> list(Count().first(5))
[0, 1, 2, 3, 4]
>>> list(Count(10).first(5))
[10, 11, 12, 13, 14]
start

start: ‘size_t’