pytubes API¶
Tube Base Class¶
-
class
tubes.
Tube
¶ The base class for all tubes. Pipelines are typically built by calling the methods defines on this class.
-
chunk
(self, size_t num)¶ Compatible input types:
Null
,Bool
,Int64
,float
,bytes
,str
,object
,Json
Recursively look at the inputs to find an Each() tube. If a single Each() tube with a list/tuple contents is found, split the value into
num
sized chunks, and chain them together.This is a bit of a hack to support some tricky use-cases (for example reading very large gzipped files requires treating every file as a different gzip stream, so calling .chunk(1) allows this to work)
It’s also an experiment to see how multi-threading may work in the future
>>> list(Each(["file.gz", "file2.gz"]).read_files().gunzip(stream=True)) Traceback (most recent call last): ... ValueError: Trailing data in gzip stream >>> list(Each(["file.gz", "file2.gz"]).read_files().gunzip(stream=True).chunk(1)) ["file1 contents", "file2 contents"]
-
csv
(self, headers=True, sep=u', ', split=True, skip_empty_rows=True)¶ Compatible input types:
bytes
,object
Interpret the input values as rows of a CSV file. Each input to csv() is treated as a separate row in the file.
Parameters: - headers (bool) – [default: True] If true, will read the first input value as a tab-separated list of field names, allowing subsequent access to values by name, as well as by index.
- sep (str) – [default: ‘,’] A single-character string that is used as the field separator when reading rows.
- split (bool) – [default: True] If True, split the input bytes on newlines, to identify rows. If False, each input value is assumed to be a separate row.
- skip_empty_rows (bool) – Skip over blank rows in the in input (note whitespace causes a row to be considered non-blank)
>>> list(Each(['sample.csv']).read_files().csv()) [(b'abc', b'def'), (b'ghi', b'jkl')] >>> list(Each(['a,b', 'c,d']).csv()) [(b'a', b'b'), (b'c', b'd')] >>> list(Each(['a,b', 'c,d']).csv(headers=True).get('a')) [b'c'] >>> list(Each(['a,b', 'c,d']).csv(headers=False).get(1).to(str)) ['b', 'd']
-
enum
(self, codec=u'utf-8')¶ Compatible input types:
Null
,Bool
,Int64
,float
,bytes
,str
Convert the input to a python object, storing and returning duplicate values where possible to reduce allocation overhead
-
enumerate
(self, start=0)¶ Compatible input types:
Null
,Bool
,Int64
,float
,bytes
,str
,object
,Json
Similar to the python builtin :func:
enumerate
function, prefix the tube’s dtype with an Int64 counter, starting fromstart
>>> list(Each(['a', 'b', 'c']).enumerate()) [(0, 'a'), (1, 'b'), (2, 'c')] >>> list(Each(['a', 'b', 'c']).enumerate(10)) [(10, 'a'), (11, 'b'), (12, 'c')]
-
equals
(self, value)¶ Compatible input types:
Null
,Bool
,Int64
,float
,bytes
,str
,object
Compare the values in parent against a static value, and output a
bool
tube with the result of that comparison.>>> list(Each(['apple', 'banana', 'cloud']).to(str).equals('banana')) [False, True, False] >>> list(Each([False, 0, '']).equals(False)) [True, True, False]
-
first
(self, size_t num)¶ Compatible input types:
Null
,Bool
,Int64
,float
,bytes
,str
,object
,Json
Create a tube that yields no more than the first num items from its parent.
>>> list(Each([1,2,3,4,5]).first(1)) [1] >>> list(Each([1,2,3,4,5]).first(10)) [1, 2, 3, 4, 5]
-
get
(self, key, default=UNDEFINED, codec=u'utf-8')¶ Compatible input types:
Null
,Bool
,Int64
,float
,bytes
,str
,object
,Json
Efficiently read the field
key
from the input object and return it’s value.If field is missing and a
default
is provided, returndefault
, otherwise raise.default
must be a valid value for the input type. For example, if the input is a Json dtype, default must be a string/bytes value that is valid JSON (e.g.'null'
)>>> list(Each(['{"a": 1}', '{"a": 2}']).json().get("a")) [1, 2] >>> list(Each(['{"a": 1}', '{"b": 2}']).json().get("a")) Traceback (most recent call last): ... KeyError: Field not found >>> list(Each(['{"a": 1}', '{"b": 2}']).json().get("a", "null")) [1, None]
-
group_id
(self)¶ Compatible input types:
Bool
,Int64
,float
,bytes
,str
Assuming the input is sorted, for each unique value in the input, return a unique integer ID for that value. If the input isn’t sorted, return a value that increments every time the input changes.
>>> list(Each(['a', 'a', 'b', 'b', 'c', 'd']).group_id()) [0, 0, 1, 1, 2, 3]
-
gt
(self, value)¶ Compatible input types:
Null
,Bool
,Int64
,float
,object
Return a bool tube, that is True if the input is greater than value, otherwise False
>>> list(Count().skip_unless(lambda x: x.gt(4)).first(2)) [5, 6]
-
gunzip
(self, stream=False)¶ Compatible input types:
Null
,Bool
,Int64
,float
,bytes
,str
,object
,Json
zlib/gzip decompress the input bytes, returning a view of up to 16 megabytes of decompressed data.
By default, this assumes that each input slice is an entire stream (i.e. from map_files). Setting stream=True treats all input values as part of a single gzip stream.
>>> list(Each(["file.gz", "file2.gz"]).map_files().gunzip()) ["file1 contents", "file2 contents"] >>> list(Each(["file.gz", "file2.gz"]).read_files().gunzip(stream=True).chunk(1)) ["file1 contents", "file2 contents"]
-
is_blank
(self)¶ Compatible input types:
bytes
,str
Returns true if the length of the input is 0, otherwise false.
>>> list(Each(['', 'b', 'ba', 'ban']).to(str).is_blank()) [True, False, False, False]
-
json
(self)¶ Compatible input types:
bytes
,str
,object
Interpret the input as JSON documents. The JSON parser is not validating, and will accept invalid JSON.
>>> list(Each(['1', '{}', '[]', 'null']).to(tubes.Utf8).json()) [1, {}, [], None] >>> list(Each(['neil']).to(tubes.Utf8).json()) [None]
-
len
(self)¶ Compatible input types:
bytes
,str
Return the length of the bytes/string value. For bytes types, this returns the byte length of the value, for str types, the number of characters is returned.
Note, technically, the length returned for str types is the count of unicode codepoints. This is not always equal to the number of visual characters, especially when, for example, modifiers are used. For example: 👋🏽 _may_ appear as a single symbol, but is counted as two characters here.
>>> list(Each(['', 'b', 'ba', 'ban']).to(str).len()) [0, 1, 2, 3] >>> list(Each(['£', '👋🏽']).to(str).len()) [1, 2] >>> list(Each(['£', '👋🏽']).to(bytes).len()) [2, 8]
-
lt
(self, value)¶ Compatible input types:
Null
,Bool
,Int64
,float
,object
Return a bool tube, that is True if the input is less than value, otherwise False
>>> list(Count().skip_unless(lambda x: x.lt(4)).first(100)) [0, 1, 2, 3]
-
map_files
(self)¶ Compatible Dtypes:
bytes
,Utf8
For each filename from the input, open the file and mmap it to a byte slice.
-
multi
(self, *makers)¶ Compatible input types:
Null
,Bool
,Int64
,float
,bytes
,str
,object
,Json
Perform multiple operations on a tube, and return a tube with n slots
makers
should be a callable that takes the input tube, and returns a tuple of tubes derived from the input.>>> list(Count().multi(lambda x: (x.lt(1), x.equals(1), x.gt(1))).first(3)) [(True, False, False), (False, True, False), (False, False, True)]
-
ndarray
(self, *slot_info, estimated_rows=32768, fields=None)¶ Create a new numpy
ndarray
with appropriate numpy dtype, and fill it with the results of the tube.Parameters: - estimated_rows (int) – A hint to pytubes as to how many rows are expected in the completed array. This affects the way in which the ndarray is resized during iteration.
- fields – If
True
, the created numpy array is 1-dimentional, using structured types. Fields names are string slot numbers. IfFalse
, all slots must have an identical type, produces an n-dimentional array, with each slot in a different dimension. - slot_info – Provide metadata about each slot to help create the ndarray Currently required by bytes types to set the column size. The n-th slot_info value is used to affect the numpy dtype of the n-th slot of the input.
>>> Each(['abcd', 'efgh']).to(bytes).ndarray(2) array([b'ab', b'ef'], dtype='|S3') >>> Each(['abcd', 'efgh']).to(bytes).ndarray(4) array([b'abcd', b'efgh'], dtype='|S5')
-
one
¶ Return just the first value from the tube, useful as a debugging aid
-
read_fileobj
(self, size=8388608)¶ Compatible Dtypes:
object
Each item in the input must be a binary file obj.
Returns an iterator that reads the contents of each file obj in size sized chunks and returns it.
-
read_files
(self)¶ Compatible Dtypes:
bytes
,Utf8
For each filename from the input, open the file and read successive chunks of data from it. Each new file will start at the beginning of a slice, but a single file may result in multiple slices.
Care must be taken when using this with iterators that have file-level context (for example gunzip()) as file boundaries are not communicated.
chunk(1) may be used to process files in isolation before compbining the iterators.
>>> list(Each(['file1.txt']).read_files().split()) ['file 1, line 1', 'file 1, line 2'] >>> list(Each(['file1.txt', 'file2.txt']).read_files().split().skip(1).chunk(1)) ['file 1 line 2', 'file 1 line 3', 'file 2 line 2', ...]
-
skip
(self, size_t num)¶ Compatible input types:
Null
,Bool
,Int64
,float
,bytes
,str
,object
,Json
Create a tube that skips the first num items, yielding any furthe times.
>>> list(Each([1,2,3,4,5]).skip(1)) [2, 3, 4, 5] >>> list(Each([1,2,3,4,5]).skip(10)) []
-
skip_if
(self, conditional)¶ Compatible input types:
Bool
,Int64
,float
,bytes
,str
,object
conditional
must either be a pre-madebool
tube, or a callable that takes a single tube argument (the parent), and returns abool
tube.Iterates over conditional and the parent together, yielding values only where the result of conditional is False.
Stops only when either the input __or__ conditional raise
StopIteration
. This can be sligtly unexpected in, for example, this case:Count().skip_if(lambda x: x.gt(2))
in which case, the result is[0, 1, 2]
, but iteration over the tube will never complete becauseskip_if
isn’t clever enough to work out that the condition tube will never return anotherTrue
. In this case, an explicit.first(n)
will limit the run time.>>> list(Count().skip_if(lambda x: x.lt(5)).first(2)) [5, 6] >>> list(Count().skip_if(Each([False, True, False]).to(bool))) [0, 2]
-
skip_unless
(self, conditional)¶ Compatible input types:
Bool
,Int64
,float
,bytes
,str
,object
conditional
must either be a pre-madebool
tube, or a callable that takes a single tube argument (the parent), and returns abool
tube.Iterates over
conditional
and the parent together, yielding values only where the result of conditional is True.Stops only when either the input __or__ conditional raise
StopIteration
. This can be sligtly unexpected in, for example, this case:Count().skip_unless(lambda x: x.lt(3))
in which case, the result is[0, 1, 2]
, but iteration over the tube will never complete becauseskip_unless
isn’t clever enough to work out that the condition tube will never return anotherTrue
. In this case, an explicit.first(n)
will limit the run time.>>> list(Count().skip_unless(lambda x: x.gt(4)).first(2)) [5, 6] >>> list(Count().skip_unless(Each([False, True, False]).to(bool))) [1]
-
slot
(self, size_t num, default=UNDEFINED)¶ Compatible input types:
Null
,Bool
,Int64
,float
,bytes
,str
,object
,Json
Return slot number
num
from the parent iter. This is only useful when dealing with multi-slot tubes.If the value in the slot can be considered undefined (for example json missing value) then
default
is returned if provided, otherwiseKeyError
is raised.
-
split
(self, sep=u'n', trim=u'', skip_empty=False)¶ Compatible input types:
bytes
,str
Split the input view on the character sep. This behaves similarly to the python
str.split()
function.This iterator will typically produce multiple values for each input value, as each split produces a new output value.
>>> list(Each(['a.b.c', 'd.e']).split(".")) ['a', 'b', 'c', 'd', 'e'] >>> list(Each(['ab\ncd', 'ef\ngh']).split()) ['ab', 'cd', 'ef', 'gh']
-
to
(self, *types, codec=u'utf-8')¶ Convert the input to the specified dtype.
Supported conversions:
To→ Null Bool Int64 float bytes str object Json ↓From Null ✓ ✓ Bool ✓ ✓ ✓ ✓ ✓ ✓ Int64 ✓ ✓ ✓ ✓ float ✓ ✓ ✓ ✓ bytes ✓ ✓ ✓ ✓ ✓ str ✓ ✓ ✓ ✓ object ✓ ✓ ✓ ✓ ✓ ✓ ✓ ✓ Json ✓ ✓ ✓ ✓
-
to_py
(self)¶ Compatible input types:
Null
,Bool
,Int64
,float
,bytes
,str
,object
,Json
Convert a tube of any type to python objects.
This is not typically needed, as calling
__iter__
on a tube implicitly converts values to python
-
to_pyarrow
(self, fields)¶ Return a new pyarrow
Array
orTable
, containing the results of the tube.Only available if the
pyarrow
module is available.Parameters: fields – The names of the Table columns >>> tubes.Count(1).first(10).to_pyarrow(['val']).to_pandas() val 0 1 1 2 ... >>> tubes.Count(1)\ .first(10)\ .multi(lambda x: ( x, x.to(float), x.gt(5) )).to_pyarrow(['val', 'val float', 'is_big']).to_pandas() val val float is_big 0 1 1.0 False 1 2 2.0 False ...
-
tsv
(self, headers=True, sep=u't', split=True, skip_empty_rows=True)¶ Compatible input types:
bytes
,object
Interpret the input values as rows of a TSV file.
Parameters: - headers (bool) – [default:
True
] IfTrue
, will read the first input value as a tab-separated list of field names, allowing subsequent access to values by name, as well as by index. - sep (str) – [default: ‘ ‘] A single-character string that is used as the field separator when reading rows.
- split (bool) – [default:
True
] IfTrue
, split the input bytes on newlines, to identify rows. IfFalse
, each input value is assumed to be a separate row. - skip_empty_rows (bool) – Skip over blank rows in the in input (note whitespace causes a row to be considered non-blank)
>>> list(Each(['sample.tsv']).read_files().tsv()) [(b'abc', b'def'), (b'ghi', b'jkl')] >>> list(Each(['a\tb', 'c\td']).tsv()) [(b'a', b'b'), (b'c', b'd')] >>> list(Each(['a\tb', 'c\td']).tsv(headers=True).get('a')) [b'c'] >>> list(Each(['a|b', 'c|d']).tsv(headers=False, sep='|').get(1).to(str)) ['b', 'd']
- headers (bool) – [default:
-
zip
(self, other)¶ Combine two inputs into one, by joining their dtypes, and iterating both together.
>>> first = Each(['a', 'b', 'c']).to(str) >>> second = Each([1, 2, 3]).to(int) >>> (first.dtype, second.dtype) ((DType[Utf8],), (DType[Int64],)) >>> first.zip(second).dtype (DType[Utf8], DType[Int64]) >>> list(first.zip(second)) [('a', 1), ('b', 2), ('c', 3)]
-
Other Classes¶
-
class
tubes.
Each
¶ Iterate over the provided python object, as an input to a tube. Takes one argument, which should either be a python iterator/generator, or an iterable.
>>> list(Each([1, 2, 3])) [1, 2, 3] >>> list(Each(itertools.count()).first(5)) [0, 1, 2, 3, 4] >>> list(Each(i*2 for i in range(5))) [0, 2, 4, 6, 8]
-
class
tubes.
Count
¶ Iterator that behaves similarly to
itertools.count()
.Takes an optional numeric argument
start
that sets the first number returned by Count() [default:0]>>> list(Count().first(5)) [0, 1, 2, 3, 4] >>> list(Count(10).first(5)) [10, 11, 12, 13, 14]
-
start
¶ start: ‘size_t’
-