#notest_global
from k1lib.imports import *
2023-05-22 13:04:01,931 INFO worker.py:1364 -- Connecting to existing Ray cluster at address: 192.168.1.35:6379...
2023-05-22 13:04:01,935 INFO worker.py:1544 -- Connected to Ray cluster. View the dashboard at 127.0.0.1:8265
A tutorial on executing tasks in multiple processes on 1 or multiple nodes. Also benchmarks them in detail to see what's going on.
Everything works wonderfully, things scales very linearly as well. But there's 1 caveat though. Things seem to scale proportional to the number of cores, not the number of threads, which sucks a little bit, as I thought I'd have twice the number of processing power previously.
For a cluster with 40 cores and 64 threads, typical speeds for processing a distributed folder is 270MB/s 1-core and 9.2GB/s for the cluster, with raw read performance of 41.3GB/s. For a distributed file, it's 127MB/s 1-core and 5.44GB/s for the cluster, with raw read performance of 44GB/s
Overall really nice. These speeds are quite insane even though I'm using Python for most of the programming. Most lines of code are very short as well, most likely having only 1 line.
Make sure you understand the basics of the k1lib library first before reading this post.
This post is a little bit long, so here are the main sections:
apply()
applyCl()
code samples and docsIn the library, there are multiple ways to execute a function over lots of data points:
range(10) | apply(lambda x: x**2) | deref() # runs on 1 thread/process on 1 computer/node
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
range(10) | applyTh(lambda x: x**2) | deref() # runs on multiple threads (still only 1 process) on 1 node. "Th" stands for threading
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
range(10) | applyMp(lambda x: x**2) | deref() # runs on multiple processes (1 thread/process) on 1 node. "Mp" stands for multiprocessing
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
range(10) | applyCl(lambda x: x**2) | deref() # runs on multiple processes (1 thread/process) on multiple nodes. "Cl" stands for cluster
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Let's do a quick benchmark to compare their speeds. We'll try to sum all numbers from 0 to 1B, with each number multiplied by 2 using each method:
n = 1_000_000_000; range(n) | batched(n//4) | deref(1)
[range(0, 250000000), range(250000000, 500000000), range(500000000, 750000000), range(750000000, 1000000000)]
%%time
range(n) | batched(n//64) | apply(apply(op()*2) | toSum()) | toSum() # 9.3 million operations/second
CPU times: user 1min 47s, sys: 63.6 ms, total: 1min 47s Wall time: 1min 47s
999999999000000000
Here, it's going to first batched the input up into batches of 10M numbers. Then it's going to spawn a bunch of threads, asking each to add 1 to all numbers in the batch and add them all up. The answer from each thread is going to be collected in the main desktop.
%%time
range(n) | batched(n//64) | applyTh(apply(op()*2) | toSum(), timeout=300) | toSum() # 9.2 million operations/second
CPU times: user 1min 51s, sys: 2.2 s, total: 1min 53s Wall time: 1min 49s
999999999000000000
So, applyTh doesn't provide any speedups at all. The reason is, while the tasks are being executed in different threads, each of them have to obtain the GIL, or Global Interpreter Lock in order to function, because this is how Python works. Many people think that this is terrible, but you can actually release the GIL in C extensions to Python, which I think makes the GIL actually beneficial.
%%time
range(n) | batched(n//64) | applyMp(apply(op()*2) | toSum()) | toSum() # 58 million operations/second
CPU times: user 346 ms, sys: 88.5 ms, total: 435 ms Wall time: 17.2 s
999999999000000000
#notest
b'' | file("applyMp.pth")
for i in range(10):
with k1.timer() as t:
range(n) | batched(n//64) | applyMp(apply(op()*2) | toSum()) | toSum()
t() | aS(dill.dumps) >> file("applyMp.pth")
cat.pickle("applyMp.pth") | deref() | aS(plt.plot); plt.grid(True)
cat.pickle("applyMp.pth") | aS(k1.UValue.fromSeries)
UValue(mean=16.595, std=0.19847) -> 16.6 ± 0.2
%%time
range(n) | batched(n//64) | applyCl(apply(op()*2) | toSum()) | toSum() # 111 million operations/second
CPU times: user 69 ms, sys: 20.3 ms, total: 89.2 ms Wall time: 9 s
999999999000000000
#notest
b'' | file("applyCl.pth")
for i in range(10):
with k1.timer() as t:
range(n) | batched(n//64, True) | applyCl(apply(op()*2) | toSum()) | toSum()
t() | aS(dill.dumps) >> file("applyCl.pth")
cat.pickle("applyCl.pth") | deref() | aS(plt.plot); plt.grid(True)
cat.pickle("applyCl.pth") | aS(k1.UValue.fromSeries)
UValue(mean=9.3556, std=0.93904) -> 9.4 ± 0.9
I'll explain the specific commands later, but in short, I've got 1 16-core computer and 2 8-core computers, and the main desktop that applyMp ran on has 16 cores, which explains the 2x faster going from applyMp()
to applyCl()
perfectly. Also notice how all we did was to change applyMp
to applyCl
in order to run things on multiple nodes. No code content needs to be changed, again, making the entire thing so much easier to just use everywhere.
cat.pickle("applyMp.pth") | deref() | aS(plt.plot); cat.pickle("applyCl.pth") | deref() | aS(plt.plot)
plt.legend(["applyMp", "applyCl"]); plt.grid(True); plt.ylim(bottom=0); plt.xlabel("Trial"); plt.ylabel("Seconds");
So as you can see, there're some serious speed gains to be have by just choosing the right apply
version, especially applyMp and applyCl. Initialization docs for all of them:
[apply, applyTh, applyMp, applyCl] | apply(lambda x: [x.__name__, str(inspect.signature(x.__init__)), x.__init__.__doc__]) | ~apply(lambda x,y,z: f"<h3>{x}{y}</h3>" + fmt.pre(f"{z}")) | aS(viz.Carousel)
Applies a function f to every element in the incoming list/iterator. Example:: # returns [0, 1, 4, 9, 16] range(5) | apply(lambda x: x**2) | deref() # returns [[3.0, 1.0, 1.0], [3.0, 1.0, 1.0]], running the function on the 0th column torch.ones(2, 3) | apply(lambda x: x+2, 0) | deref() # returns [[0, -1, 2, 3, -4], [2, -3, 4, 5, -6], [0, -1, 4, 9, -16]], running the function on the 1st (0-index btw) and 4th columns [[0, 1, 2, 3, 4], [2, 3, 4, 5, 6], [0, 1, 4, 9, 16]] | apply(lambda x: -x, [1, 4]) | deref() You can also use this as a decorator, like this:: @apply def f(x): return x**2 # returns [0, 1, 4, 9, 16] range(5) | f | deref() You can also add a cache, like this:: def calc(i): time.sleep(0.5); return i**2 # takes 2.5s range(5) | repeatFrom(2) | apply(calc, cache=10) | deref() # takes 5s range(5) | repeatFrom(2) | apply(calc) | deref() You can add custom keyword arguments into the function:: def f(x, y, z=3): return x + y + z # returns [15, 17, 19, 21, 23] [range(5), range(10, 15)] | transpose() | ~apply(f, z=5) | deref() If "apply" is too hard to remember, this cli also has an alias :class:`map_` that kinda mimics Python's ``map()``. Also slight reminder that you can't pass in extra positional args like in :class:`aS`, just extra keyword arguments. :param column: if not None, then applies the function to that column or columns only :param cache: if specified, then caches this much number of values :param kwargs: extra keyword arguments to pass in the function
Kinda like the same as :class:`applyMp`, but executes ``f`` on multiple threads, instead of on multiple processes. Advantages: - Relatively low overhead for thread creation - Fast, if ``f`` is io-bound - Does not have to serialize and deserialize the result, meaning iterators can be exchanged Disadvantages: - Still has thread creation overhead, so it's still recommended to specify ``bs`` - Is slow if ``f`` has to obtain the GIL to be able to do anything All examples from :class:`applyMp` should work perfectly here.
Like :class:`apply`, but execute a function over the input iterator in multiple processes. Example:: # returns [3, 2] ["abc", "de"] | applyMp(len) | deref() # returns [5, 6, 9] range(3) | applyMp(lambda x, bias: x**2+bias, bias=5) | deref() # returns [[1, 2, 3], [1, 2, 3]], demonstrating outside vars work someList = [1, 2, 3] ["abc", "de"] | applyMp(lambda s: someList) | deref() Internally, this will continuously spawn new jobs up until 80% of all CPU cores are utilized. On posix systems, the default multiprocessing start method is ``fork()``. This sort of means that all the variables in memory will be copied over. On windows and macos, the default start method is ``spawn``, meaning each child process is a completely new interpreter, so you have to pass in all required variables and reimport every dependencies. Read more at https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods If you don't wish to schedule all jobs at once, you can specify a ``prefetch`` amount, and it will only schedule that much jobs ahead of time. Example:: range(10000) | applyMp(lambda x: x**2) | head() | deref() # 700ms range(10000) | applyMp(lambda x: x**2, 5) | head() | deref() # 300ms # demonstrating there're no huge penalties even if we want all results at the same time range(10000) | applyMp(lambda x: x**2) | deref() # 900ms range(10000) | applyMp(lambda x: x**2, 5) | deref() # 1000ms The first line will schedule all jobs at once, and thus will require more RAM and compute power, even though we discard most of the results anyway (the :class:`~k1lib.cli.filt.head` cli). The second line only schedules 5 jobs ahead of time, and thus will be extremely more efficient if you don't need all results right away. .. note:: Remember that every :class:`~k1lib.cli.init.BaseCli` is also a function, meaning that you can do stuff like:: # returns [['ab', 'ac']] [["ab", "cd", "ac"]] | applyMp(filt(op().startswith("a")) | deref()) | deref() Also remember that the return result of ``f`` should be serializable, meaning it should not be a generator. That's why in the example above, there's a ``deref()`` inside f. You should also convert PyTorch tensors into Numpy arrays Most of the time, you would probably want to specify ``bs`` to something bigger than 1 (may be 32 or sth like that). This will executes ``f`` multiple times in a single job, instead of executing ``f`` only once per job. Should reduce overhead of process creation dramatically. If you encounter strange errors not seen on :class:`apply`, you can try to clear all pools (using :meth:`clearPools`), to terminate all child processes and thus free resources. On earlier versions, you have to do this manually before exiting, but now :class:`applyMp` is much more robust. Also, you should not immediately assume that :class:`applyMp` will always be faster than :class:`apply`. Remember that :class:`applyMp` will create new processes, serialize and transfer data to them, execute it, then transfer data back. If your code transfers a lot of data back and forth (compared to the amount of computation done), or the child processes don't have a lot of stuff to do before returning, it may very well be a lot slower than :class:`apply`. There's a potential loophole here that can make your code faster. Because the main process is forked (at least on linux), every variable is still there, even the big ones. So, you can potentially do something like this:: bigData = [] # 1B items in the list # summing up all items together. No input data transfers (because it's forked instead) range(1_000_000_000) | batched(100) | applyMp(lambda r: r | apply(lambda i: bigData[i]) | toSum()) | toSum() In fact, I use this loophole all the time, and thus has made the function :meth:`shared`, so check it out. :param prefetch: if not specified, schedules all jobs at the same time. If specified, schedules jobs so that there'll only be a specified amount of jobs, and will only schedule more if results are actually being used. :param timeout: seconds to wait for job before raising an error :param utilization: how many percent cores are we running? 0 for no cores, 1 for all the cores. Defaulted to 0.8 :param bs: if specified, groups ``bs`` number of transforms into 1 job to be more efficient. :param kwargs: extra arguments to be passed to the function. ``args`` not included as there're a couple of options you can pass for this cli. :param newPoolEvery: creates a new processing pool for every specific amount of input fed. 0 for not refreshing any pools at all. Turn this on in case your process consumes lots of memory and you have to kill them eventually to free up some memory
Like :class:`apply`, but execute a function over the input iterator in multiple processes on multiple nodes inside of a cluster (hence "cl"). So, just a more powerful version of :class:`applyMp`, assuming you have a cluster to run it on. Example:: # returns [3, 2] ["abc", "de"] | applyCl(len) | deref() # returns [5, 6, 9] range(3) | applyCl(lambda x, bias: x**2+bias, bias=5) | deref() # returns [[1, 2, 3], [1, 2, 3]], demonstrating outside vars work someList = [1, 2, 3] ["abc", "de"] | applyCl(lambda s: someList) | deref() Internally, this uses the library Ray (https://www.ray.io) to do the heavy lifting. So, :class:`applyCl` can be thought of as a thin wrapper around that library, but still has the same consistent interface as :class:`apply` and :class:`applyMp`. From all of my tests so far, it seems that :class:`applyCl` works quite well and is quite robust, so if you have access to a cluster, use it over :class:`applyMp`. The library will connect to a Ray cluster automatically when you import everything using ``from k1lib.imports import *``. It will execute ``import ray; ray.init()``, which is quite simple. If you have ray installed, but does not want this default behavior, you can do this:: import k1lib k1lib.settings.startup.init_ray = False from k1lib.imports import * As with :class:`applyMp`, there are pitfalls and weird quirks to multiprocessing, on 1 or multiple nodes, so check out the docs over there to be aware of them, as those translates well to here. .. admonition:: Advanced use case Not really advanced, but just a bit difficult to understand/follow. Let's say that you want to scan through the home directory of all nodes, grab all files, read them, and get the number of bytes they have. You can do something like this:: a = None | applyCl.aS(lambda: None | cmd("ls ~") | filt(os.path.isfile) | deref()) | deref() b = a | ungroup(single=True, begin=True) | deref() c = b | applyCl(cat(text=False) | shape(0), pre=True) | deref() d = c | groupBy(0, True) | apply(item().all() | toSum(), 1) | deref() Noted, this is relatively complex. Let's see what A, B, C and D looks like:: # A [['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', ['Miniconda3-latest-Linux-x86_64.sh', 'mintupgrade-2023-04-01T232950.log']], ['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', ['5a', 'abc.jpg', 'a.txt']]] # B [['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', 'Miniconda3-latest-Linux-x86_64.sh'], ['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', 'mintupgrade-2023-04-01T232950.log'], ['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', '5a'], ['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 'abc.jpg'], ['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 'a.txt']] # C [['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', 74403966], ['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', 1065252], ['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 2601], ['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 16341], ['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 10177]] # D [['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 92185432], ['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', 75469218]] The steps we're concerned with is A and C. In step A, we're running 2 processes, 1 for each node, to get all the file names in the home directory. In step C, we're running 5 processes total, 2 on the first node and 3 on the second node. For each process, it's going to read as bytes and count up those bytes. Finally in step D, the results are grouped together and the sizes summed. So yeah, it's pretty nice that we did all of that in a relatively short amount of code. The data is distributed too (reading multiple files from multiple nodes), so we're truly not bottlenecked by anything. :param prefetch: if not specified, schedules all jobs at the same time. If specified, schedules jobs so that there'll only be a specified amount of jobs, and will only schedule more if results are actually being used. :param timeout: seconds to wait for job before raising an error :param bs: if specified, groups ``bs`` number of transforms into 1 job to be more efficient. :param rss: resources required for the task. Can be {"custom_resource1": 2} or "custom_resource1" as a shortcut :param pre: "preserve", same convention as :meth:`applyCl.aS`. If True, then allow passing through node ids as the first column to shedule jobs on those specific nodes only :param orPatch: whether to automatically patch __or__ function so that cli tools can work with numpy arrays on that remote worker :param num_cpus: how many cpu does each task take? :param kwargs: extra arguments to be passed to the function. ``args`` not included as there're a couple of options you can pass for this cli.
Just a side note: all of these instructions and applyCl itself only work on Linux. If you're using Windows or Mac for heavy compute, what're you doing? You need to wipe all of your disks and install Ubuntu/Fedora on those computers instead.
On node 1, execute these commands:
conda install python
pip install ray
. This is the main library that does all of the heavy lifting behind the scenespip install k1lib
python --version
, ray --version
ray start --head
On node 2 (and beyond):
ray start --address=192.168.1.10:6379
These are some example code snippets to show you the style and vibe of how you'd do things with applyCl:
applyCl.nodeIds() # returns a list of all node ids ["abc...", "def...", ...]
applyCl.nodeId() # returns current node's id
applyCl.cpu() # returns current node's #cpu cores
applyCl.diskScan("~/ssd2") # scans for all existing distributed files and folders inside ~/ssd2
# basic usage, returns [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
range(10) | applyCl(op()*2) | deref()
# applies the function on [3, 4], but do it in 2 different nodes. Returns [["abc...", 4], ["def...", 5]]
[["abc...", 3], ["def...", 4]] | applyCl(lambda x: x+1, pre=True) | deref()
# grabs files and folders in the home folder of every node
None | applyCl.aS(lambda: ls("~")) | deref()
# executes random terminal command on all nodes, especially convenient to apt install things everywhere
applyCl.cmd("mkdir ~/abc", sudo=True)
# reads a file from another node, kinda like normal cat()
["abc...", "~/ssd2/someFile.txt"] | applyCl.cat()
# creates a distributed folder by spilling files in the specified folder around to other nodes so that it's balanced
applyCl.balanceFolder("~/ssd2/test")
# first grabs all files inside the folder "~/ssd2/test" from all nodes, then for each file, read it and return the number of lines
None | applyCl.aS(lambda: ls("~/ssd2/test")) | ungroup() | applyCl(cat() | shape(0), pre=True) | deref()
# creates a distributed file by cutting the files up into multiple pieces and send those pieces to the same file on other nodes
applyCl.balanceFile("~/ssd2/giantFile.txt")
# gets the number of lines of the giant file. Splits each fragment (#fragments = #nodes) of the file on multiple nodes into multiple sections (#sections = #cpu cores), then get the number of lines for each fragment and add them all up
applyCl.cat("~/ssd2/giantFile.txt", shape(0)) | toSum()
# plots a histogram of the 1st column in a tab-delimited distributed file
applyCl.cat("~/ssd2/giantFile.txt", table() | cut(0) | toFloat() | hist(300)) | hist.join() | ~aS(plt.bar)
# replicates reference human genome file to all nodes
applyCl.replicateFile("~/ssd2/hg38.fa")
# aligns "sample.truncated" genome reads to the reference genome using hisat2 and produce a "sample.sam" file
None | applyCl.aS(lambda: None | cmd(f"hisat2 -p {applyCl.cpu()} -x ~/ssd2/hg38 -U sample.truncated -S sample.sam") | deref()) | deref()
Docstring of all functions:
dir(applyCl) | ~filt(op().startswith("_")) | apply(lambda x: [x, getattr(applyCl, x).__doc__]) | filt(op(), 1) | ~inSet(["f", "hint", "all"], 0) | apply("applyCl." + op() | aS(fmt.h, 3), 0) | apply(fmt.pre, 1) | join("\n").all() | aS(viz.Carousel)
Executes function f once for all node ids that are piped in. Example:: # returns [['1051da...', ['Desktop', 'Downloads']], ['7bb387...', ['Pictures', 'Music']]] applyCl.nodeIds() | applyCl.aS(lambda: None | cmd("ls ~") | deref()) | deref() # also returns [['1051da...', ['Desktop', 'Downloads']], ['7bb387...', ['Pictures', 'Music']]] None | applyCl.aS(lambda: None | cmd("ls ~") | deref()) | deref() If you want to execute f for all nodes, you can pass in None instead. As a reminder, this kinda follows the same logic as the popular cli :class:`aS`, where f is executed once, hence the name "apply Single". Here, the meaning of "single" is different. It just means execute once for each node ids. :param f: main function to execute in each node. Not supposed to accept any arguments :param timeout: seconds to wait for job before raising an error
Splits a specified file in node nAs and dumps other parts to nodes nBs. Example:: applyCl.balanceFile("~/cron.log") This will split the big files up into multiple segments (1 for each node). Then for each segment, it will read through it chunk by chunk into memory, and then deposits it into the respective nodes. Finally, it truncates the original files down to its segment boundary. The main goal of this is so that you can analyze a single big (say 200GB) file quickly. If that file is on a single node, then it will take forever, even with :class:`applyMp`. So splitting things up on multiple nodes will make analyzing it a lot faster. There's also the function :meth:`balanceFolder`, which has the opposite problem of having lots of small (say 100MB) files. So it will try to move files around (keeping them intact in the meantime) to different nodes so that the folder size ratio is roughly proportional to the cpu count. The exact split rule depends on the number of CPUs of each node. Best to see an example:: Command: applyCl.balanceFile("~/cron.log") Verbose command: applyCl.balanceFile("~/cron.log", ["1"], ["1", "2", "3", "4", "5"]) ----------- Before ----------- Node: 1 2 3 4 5 Cpu: 8 12 16 8 8 Size (GB): 52 0 0 0 0 ----------- After ----------- Node: 1 2 3 4 5 Cpu: 8 12 16 8 8 Size (GB): 8 12 16 8 8 This also works if you have files on existing nodes already, and are upgrading the cluster:: Command: applyCl.balanceFile("~/cron.log") Verbose command: applyCl.balanceFile("~/cron.log", ["1", "5"], ["1", "2", "3", "4", "5"]) ----------- Before ----------- Node: 1 2 3 4 5 Cpu: 8 12 16 8 8 Size (GB): 26 0 0 26 0 ----------- After ----------- Node: 1 2 3 4 5 Cpu: 8 12 16 8 8 Size (GB): 8 12 16 8 8 If you want to move files out of a node when decommissioning them, you can do something like this:: Command: applyCl.decommission("~/cron.log", ["3", "4"]) Verbose command: applyCl.balanceFile("~/cron.log", ["1", "2", "3", "4", "5"], ["1", "2", "5"]) ----------- Before ----------- Node: 1 2 3 4 5 Cpu: 8 12 16 8 8 Size (GB): 8 12 16 8 8 ----------- After ----------- Node: 1 2 3 4 5 Cpu: 8 12 16 8 8 Size (GB): 15 22 0 0 15 Remember that the node ids "1", etc. is for illustrative purposes only. You should get real node ids from :meth:`nodeIds`. Why is the file size proportional to the number of cores on each node? Well, if you have more cores, you should be able to process more, so as to make everything balanced, right? Again, this means that you can split arbitrarily large files as long as you have the disk space for it, ram size is not a concern. How does this perform? Not the best in the world if you don't have a lot of nodes. With sata 3 ssds, 750MB/s ethernet, I got transfer speeds of roughly 100MB/s. This should increase as you have more nodes based on the code structure, but I haven't tested it yet. Can it be faster? Definitely. Am I willing to spend time optimizing it? No. :param fn: file name :param nAs: node ids that currently stores the file. If not specified, try to detect what nodes the file exists in :param nBs: node ids that will store the file after balancing everything out. If not specified, will take all available nodes :param rS: :class:`~k1lib.cli.inp.refineSeek` instance, if you need more fine-grained control over section boundaries so as to not make everything corrupted
Balances all files within a folder across all nodes. Example:: base = "~/repos/labs/k1lib/k1lib/cli/test/applyCl.balance" # deletes old structures and making test folder applyCl.cmd(f"rm -r {base}"); applyCl.cmd(f"mkdir -p {base}") # creates 20 files of different sizes and dump it in the base folder of the current node torch.linspace(1e4, 1e5, 20).int() | apply(lambda x: "x"*x) | insertIdColumn() | ~apply(lambda idx, contents: contents | file(f"{base}/{idx}.txt")) | deref(); # transfers files between nodes such that the total folder size is proportional to the number of cpus across nodes applyCl.balanceFolder(base) # get folder size of all nodes None | applyCl.aS(lambda: ls(base) | apply(os.path.getsize) | toSum()) | deref() # creates 20 additional files and dump it to the current node torch.linspace(1e4, 1e5, 20).int() | apply(lambda x: "x"*x) | insertIdColumn() | ~apply(lambda idx, contents: contents | file(f"{base}/{idx+20}.txt")) | deref(); # balances the tree out again applyCl.balance(base) # get folder size of all nodes None | applyCl.aS(lambda: ls(base) | apply(os.path.getsize) | toSum()) | deref() So imagine that you just downloaded 1000 files to a single node on a specific folder, but you need to analyze all of them in a distributed manner. What you can do is to move some files to other nodes and then do your analysis. If you want to download more files, just dump it to any node (or download distributed across all nodes), then rebalance the folders and do your analysis. :param folder: folder to rebalance all of the files :param maxSteps: what's the maximum number of file transfers? By default has no limit, so that files are transferred until :param audit: if True, don't actually move files around and just return what files are going to be moved where
Reads a file distributedly, does some operation on them, collects and returns all of the data together. Example:: fn = "~/repos/labs/k1lib/k1lib/cli/test/applyCl.cat.data" ("0123456789"*5 + "\n") * 1000 | file(fn) applyCl.splitFile(fn) applyCl.cat(fn, shape(0), keepNodeIds=True) | deref() That returns something like this (for a 2-node cluster, with 2 (node A) and 4 (node B) cpus respectively):: [['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', 167], ['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', 167], ['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 166], ['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 167], ['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 166], ['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 167]] Here, we're creating an initial file with 1000 lines. Then we'll split it up into 2 fragments: 334 lines and 667 lines and store them on the respective nodes. Then, on node A, we'll split the file up into 2 parts, each with 167 lines. On node B, we'll split the file up into 4 parts, each with around 166 lines. Then we'll schedule 6 processes total, each dealing with 166 lines. After all of that, results are collected together and returned. If you want to distinguish between different processes inside f, for example you want to write results into different files, you can do something like this:: dir_ = "~/repos/labs/k1lib/k1lib/cli/test" fn = f"{dir_}/applyCl.cat.data" applyCl.cmd(f"rm -r {dir_}/applyCl") # clear out old folders applyCl.cmd(f"mkdir -p {dir_}/applyCl") # creating folders # do processing on fn distributedly, then dump results into multiple files applyCl.cat(fn, ~aS(lambda idx, lines: lines | shape(0) | aS(dill.dumps) | file(f"{dir_}/applyCl/{idx}.pth")), includeId=True) | deref() # reading all files and summing them together None | applyCl.aS(lambda: ls(f"{dir_}/applyCl")) | ungroup(single=True, begin=True) | applyCl(cat(text=False) | aS(dill.loads), pre=True) | cut(1) | toSum() .. admonition:: Simple mode There's also another mode that's activated whenever f is not specified that feels more like vanilla :class:`~inp.cat`. Say you have a file on a specific node:: nodeId = "7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d" fn = "~/ssd2/randomFile.txt" # -------------- file is on current node -------------- cat(fn) # returns iterator of lines inside the file fn | cat() # same thing as above # -------------- file is on remote node -------------- [nodeId, fn] | applyCl.cat() # returns iterator of lines of the file applyCl.cat([nodeId, fn]) # same thing nodeId | applyCl.cat(fn) # also same thing So yeah, there're lots of ways to just simply read a file on a remote node. Is it too much? Probably, but good thing is that you can pick any that's intuitive for you. Note that this mode is just for convenience only, for when you want to do exploratory analysis on a single remote file. To be efficient at bulk processing, use the normal mode instead. :param fn: file name :param f: function to execute in every process :param timeout: kills the processes if it takes longer than this amount of seconds :param keepNodeIds: whether to keep the node id column or not :param multiplier: by default, each node will spawn as many process as there are cpus. Sometimes you want to spawn more process, change this to a higher number :param includeId: includes a unique id for this process
Convenience function to execute shell command on all nodes. Example:: applyCl.cmd("mkdir -p /some/folder") It returns [[nodeid1, output1], [nodeid2, output2]]. If you need more flexibility, fall back to :meth:`applyCl.aS` :param s: shell command to execute :param sudo: if True, will execute the command with sudo privileges. Will ask for password and then cache it internally for 5 minutes
Grabs the number of cpus available on this node
Convenience function for :meth:`balanceFile`. See docs over there.
Scans for files and folders in the specified folder for potential distributed files and folders. A distributed file is a file that exists on more than 1 node. A distributed folder is a folder that that exists on more than 1 node and does not have any shared children. Example:: applyCl.diskScan("~/ssd2") applyCl.diskScan("~/ssd2", True) The first line does not return anything, but will print out something like this: .. include:: ../literals/diskScan.rst While the second line will return a parseable data structure instead:: [[['/home/kelvin/ssd2/data/genome/RegulationFeatureActivity', [4113489746, 7912834090, 4164314316]], ['/home/kelvin/ssd2/data/genome/go/release_geneontology_org', [2071645117, 4172737915, 2107005131]], ['/home/kelvin/ssd2/data/genome/RegulationFeatureActivity.backup', [568878496, 552888466, 600610083]], ['/home/kelvin/ssd2/data/genome/00-common_all.idx', [341738564, 671136833, 0]], ['/home/kelvin/ssd2/data/genome/genbank/ch1.dat.gz', [25356744, 0, 25356764]], ['/home/kelvin/ssd2/test', [136152, 273530, 136351]], ['/home/kelvin/ssd2/data/genome/genbank/ch1', [0, 0, 0]]], [['/home/kelvin/ssd2/data/genome/dummy.txt', [1101, 1101, 1101]]], [['/home/kelvin/ssd2/data/genome/00-All.vcf', [32737509360, 65475018903, 32737509588]], ['/home/kelvin/ssd2/data/genome/MotifFeatures/homo_sapiens.GRCh38.motif_features.gff', [13963854962, 27927709895, 13963854962]], ['/home/kelvin/ssd2/data/genome/00-common_all.vcf', [2353901811, 4707803470, 2353901831]]]] Remember that since an operating system usually have lots of shared files (like "~/.bashrc", for example), these might be mistaken as a distributed file. Make sure to only scan folders that you store data in, or else it'll take a long time to return. :param folder: the folder to scan through :param raw: whether to return raw data or display it out nicely
Downloads a file distributedly to a specified folder. Example:: url = "https://vim.kelvinho.org" fn = "~/repos/labs/k1lib/k1lib/cli/test/applyCl.download" # file/folder name applyCl.download(url, fn) # will download distributedly and dump file fragments into the folder fn applyCl.download(url, fn, True) # same as above, but collects all fragments together, places it in fn in the current node, then deletes the temporary file fragments This only works if the server allows partial downloads btw. :param url: url to download :param folder: which folder to download parts into :param merge: whether to merge all of the fragments together into a single file in the current node or not :param timeout: timeout for each process :param chunkTimeout: timeout for each file chunk inside each process
Grabs the metadata object for the current node
Returns current node id
Returns a list of all node ids in the current cluster. Example:: applyCl.nodeIds() # returns something like ['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', '1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068'] If you want to get nodes' metadata, then just use ray's builtin function ``ray.nodes()`` :param includeSelf: whether to include node id of the current process or not
Replicates a specific file in the current node to all the other nodes. Example:: applyCl.replicate("~/cron.log") Internally, this will read chunks of 100kB of the specified file and dump it incrementally to all other nodes, which has implications on performance. To increase or decrease it, check out :class:`~k1lib.cli.inp.cat`. This also means you can replicate arbitrarily large files around as long as you have the disk space for it, while ram size doesn't really matter :param fn: file name
This is what my cluster looks like now:
applyCl.cmd("lscpu") | apply(fmt.h, 0, level=3) | apply(join("\n") | aS(fmt.pre), 1) | join("\n").all() | aS(viz.Carousel)
Architecture: x86_64 CPU op-mode(s): 32-bit, 64-bit Address sizes: 39 bits physical, 48 bits virtual Byte Order: Little Endian CPU(s): 8 On-line CPU(s) list: 0-7 Vendor ID: GenuineIntel Model name: Intel(R) Core(TM) i7-7700 CPU @ 3.60GHz CPU family: 6 Model: 158 Thread(s) per core: 2 Core(s) per socket: 4 Socket(s): 1 Stepping: 9 CPU max MHz: 4200.0000 CPU min MHz: 800.0000 BogoMIPS: 7200.00 Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc art arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc cpuid aperfmperf pni pclmulqdq dtes64 monitor ds_cpl vmx smx est tm2 ssse3 sdbg fma cx16 xtpr pdcm pcid sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm 3dnowprefetch cpuid_fault epb invpcid_single pti ssbd ibrs ibpb stibp tpr_shadow vnmi flexpriority ept vpid ept_ad fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms invpcid mpx rdseed adx smap clflushopt intel_pt xsaveopt xsavec xgetbv1 xsaves dtherm ida arat pln pts hwp hwp_notify hwp_act_window hwp_epp md_clear flush_l1d arch_capabilities Virtualization: VT-x L1d cache: 128 KiB (4 instances) L1i cache: 128 KiB (4 instances) L2 cache: 1 MiB (4 instances) L3 cache: 8 MiB (1 instance) NUMA node(s): 1 NUMA node0 CPU(s): 0-7 Vulnerability Itlb multihit: KVM: Mitigation: VMX disabled Vulnerability L1tf: Mitigation; PTE Inversion; VMX conditional cache flushes, SMT vulnerable Vulnerability Mds: Mitigation; Clear CPU buffers; SMT vulnerable Vulnerability Meltdown: Mitigation; PTI Vulnerability Mmio stale data: Mitigation; Clear CPU buffers; SMT vulnerable Vulnerability Retbleed: Mitigation; IBRS Vulnerability Spec store bypass: Mitigation; Speculative Store Bypass disabled via prctl and seccomp Vulnerability Spectre v1: Mitigation; usercopy/swapgs barriers and __user pointer sanitization Vulnerability Spectre v2: Mitigation; IBRS, IBPB conditional, RSB filling, PBRSB-eIBRS Not affected Vulnerability Srbds: Mitigation; Microcode Vulnerability Tsx async abort: Mitigation; TSX disabled
Architecture: x86_64 CPU op-mode(s): 32-bit, 64-bit Address sizes: 39 bits physical, 48 bits virtual Byte Order: Little Endian CPU(s): 8 On-line CPU(s) list: 0-7 Vendor ID: GenuineIntel Model name: Intel(R) Core(TM) i7-7700 CPU @ 3.60GHz CPU family: 6 Model: 158 Thread(s) per core: 2 Core(s) per socket: 4 Socket(s): 1 Stepping: 9 CPU max MHz: 4200.0000 CPU min MHz: 800.0000 BogoMIPS: 7200.00 Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc art arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc cpuid aperfmperf pni pclmulqdq dtes64 monitor ds_cpl vmx smx est tm2 ssse3 sdbg fma cx16 xtpr pdcm pcid sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm 3dnowprefetch cpuid_fault epb invpcid_single pti ssbd ibrs ibpb stibp tpr_shadow vnmi flexpriority ept vpid ept_ad fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms invpcid mpx rdseed adx smap clflushopt intel_pt xsaveopt xsavec xgetbv1 xsaves dtherm ida arat pln pts hwp hwp_notify hwp_act_window hwp_epp md_clear flush_l1d arch_capabilities Virtualization: VT-x L1d cache: 128 KiB (4 instances) L1i cache: 128 KiB (4 instances) L2 cache: 1 MiB (4 instances) L3 cache: 8 MiB (1 instance) NUMA node(s): 1 NUMA node0 CPU(s): 0-7 Vulnerability Itlb multihit: KVM: Mitigation: VMX disabled Vulnerability L1tf: Mitigation; PTE Inversion; VMX conditional cache flushes, SMT vulnerable Vulnerability Mds: Mitigation; Clear CPU buffers; SMT vulnerable Vulnerability Meltdown: Mitigation; PTI Vulnerability Mmio stale data: Mitigation; Clear CPU buffers; SMT vulnerable Vulnerability Retbleed: Mitigation; IBRS Vulnerability Spec store bypass: Mitigation; Speculative Store Bypass disabled via prctl and seccomp Vulnerability Spectre v1: Mitigation; usercopy/swapgs barriers and __user pointer sanitization Vulnerability Spectre v2: Mitigation; IBRS, IBPB conditional, RSB filling, PBRSB-eIBRS Not affected Vulnerability Srbds: Mitigation; Microcode Vulnerability Tsx async abort: Mitigation; TSX disabled
Architecture: x86_64 CPU op-mode(s): 32-bit, 64-bit Address sizes: 46 bits physical, 48 bits virtual Byte Order: Little Endian CPU(s): 32 On-line CPU(s) list: 0-31 Vendor ID: GenuineIntel Model name: 13th Gen Intel(R) Core(TM) i9-13900K CPU family: 6 Model: 183 Thread(s) per core: 2 Core(s) per socket: 24 Socket(s): 1 Stepping: 1 CPU max MHz: 5800.0000 CPU min MHz: 800.0000 BogoMIPS: 5990.40 Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc art arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc cpuid aperfmperf tsc_known_freq pni pclmulqdq dtes64 monitor ds_cpl vmx smx est tm2 ssse3 sdbg fma cx16 xtpr pdcm pcid sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm 3dnowprefetch cpuid_fault invpcid_single ssbd ibrs ibpb stibp ibrs_enhanced tpr_shadow vnmi flexpriority ept vpid ept_ad fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms invpcid rdseed adx smap clflushopt clwb intel_pt sha_ni xsaveopt xsavec xgetbv1 xsaves split_lock_detect avx_vnni dtherm ida arat pln pts hwp hwp_notify hwp_act_window hwp_epp hwp_pkg_req umip pku ospke waitpkg gfni vaes vpclmulqdq tme rdpid movdiri movdir64b fsrm md_clear serialize pconfig arch_lbr flush_l1d arch_capabilities Virtualization: VT-x L1d cache: 896 KiB (24 instances) L1i cache: 1.3 MiB (24 instances) L2 cache: 32 MiB (12 instances) L3 cache: 36 MiB (1 instance) NUMA node(s): 1 NUMA node0 CPU(s): 0-31 Vulnerability Itlb multihit: Not affected Vulnerability L1tf: Not affected Vulnerability Mds: Not affected Vulnerability Meltdown: Not affected Vulnerability Mmio stale data: Not affected Vulnerability Retbleed: Not affected Vulnerability Spec store bypass: Mitigation; Speculative Store Bypass disabled via prctl and seccomp Vulnerability Spectre v1: Mitigation; usercopy/swapgs barriers and __user pointer sanitization Vulnerability Spectre v2: Mitigation; Enhanced IBRS, IBPB conditional, RSB filling, PBRSB-eIBRS SW sequence Vulnerability Srbds: Not affected Vulnerability Tsx async abort: Not affected
Architecture: x86_64 CPU op-mode(s): 32-bit, 64-bit Address sizes: 39 bits physical, 48 bits virtual Byte Order: Little Endian CPU(s): 16 On-line CPU(s) list: 0-15 Vendor ID: GenuineIntel Model name: Intel(R) Core(TM) i7-10700K CPU @ 3.80GHz CPU family: 6 Model: 165 Thread(s) per core: 2 Core(s) per socket: 8 Socket(s): 1 Stepping: 5 CPU max MHz: 5100.0000 CPU min MHz: 800.0000 BogoMIPS: 7599.80 Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc art arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc cpuid aperfmperf pni pclmulqdq dtes64 monitor ds_cpl vmx smx est tm2 ssse3 sdbg fma cx16 xtpr pdcm pcid sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm 3dnowprefetch cpuid_fault invpcid_single ssbd ibrs ibpb stibp ibrs_enhanced tpr_shadow vnmi flexpriority ept vpid ept_ad fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms invpcid mpx rdseed adx smap clflushopt intel_pt xsaveopt xsavec xgetbv1 xsaves dtherm ida arat pln pts hwp hwp_notify hwp_act_window hwp_epp pku ospke md_clear flush_l1d arch_capabilities Virtualization: VT-x L1d cache: 256 KiB (8 instances) L1i cache: 256 KiB (8 instances) L2 cache: 2 MiB (8 instances) L3 cache: 16 MiB (1 instance) NUMA node(s): 1 NUMA node0 CPU(s): 0-15 Vulnerability Itlb multihit: KVM: Mitigation: VMX disabled Vulnerability L1tf: Not affected Vulnerability Mds: Not affected Vulnerability Meltdown: Not affected Vulnerability Mmio stale data: Mitigation; Clear CPU buffers; SMT vulnerable Vulnerability Retbleed: Mitigation; Enhanced IBRS Vulnerability Spec store bypass: Mitigation; Speculative Store Bypass disabled via prctl and seccomp Vulnerability Spectre v1: Mitigation; usercopy/swapgs barriers and __user pointer sanitization Vulnerability Spectre v2: Mitigation; Enhanced IBRS, IBPB conditional, RSB filling, PBRSB-eIBRS SW sequence Vulnerability Srbds: Mitigation; Microcode Vulnerability Tsx async abort: Not affected
applyCl.diskScan("~/ssd2")
------------------------------------------------------------ Distributed folders ------------------------------------------------------------ Path Total size Size on each node (node id and thread count) 2fd74, 8 thr 901f1, 8 thr 7c0fd, 32 thr 82db3, 16 thr ---------------------------------------- ---------- ------------ ------------ ------------ ------------ /home/kelvin/ssd2/data/genome/RegulationFeatureActivity 16.19 GB 1.24 GB 1.24 GB 10.83 GB 2.88 GB /home/kelvin/ssd2/data/genome/go/release_geneontology_org 8.35 GB 624.98 MB 573.72 MB 5.85 GB 1.3 GB /home/kelvin/ssd2/data/genome/RegulationFeatureActivity.backup 1.72 GB 131.93 MB 131.45 MB 1.15 GB 306.2 MB /home/kelvin/ssd2/data/genome/00-common_all.idx 1.01 GB 0.0 B 341.74 MB 0.0 B 671.14 MB /home/kelvin/ssd2/test 255.86 MB 17.9 MB 20.47 MB 171.17 MB 46.31 MB /home/kelvin/ssd2/data/genome/genbank/ch1.dat.gz 50.71 MB 25.36 MB 25.36 MB 0.0 B 0.0 B /home/kelvin/ssd2/data/genome/genbank/ch1 0.0 B 0.0 B 0.0 B 0.0 B 0.0 B A distributed folder is a folder that has many files and folders inside, but their names are all different from each other. It's managed by applyCl.balanceFolder() ------------------------------------------------------------ Replicated files ------------------------------------------------------------ Path Total size Size on each node (node id and thread count) 2fd74, 8 thr 901f1, 8 thr 7c0fd, 32 thr 82db3, 16 thr ---------------------------------------- ---------- ------------ ------------ ------------ ------------ /home/kelvin/ssd2/data/genome/dummy.txt 4.4 kB 1.1 kB 1.1 kB 1.1 kB 1.1 kB A replicated file is a file that has been copied to multiple nodes. Size of all file copies should be the same. It's managed by applyCl.replicateFile() ------------------------------------------------------------ Distributed files ------------------------------------------------------------ Path Total size Size on each node (node id and thread count) 2fd74, 8 thr 901f1, 8 thr 7c0fd, 32 thr 82db3, 16 thr ---------------------------------------- ---------- ------------ ------------ ------------ ------------ /home/kelvin/ssd2/data/genome/00-All.vcf 130.95 GB 32.74 GB 32.74 GB 0.0 B 65.48 GB /home/kelvin/ssd2/data/genome/MotifFeatures/homo_sapiens.GRCh38.motif_features.gff 55.86 GB 4.26 GB 4.26 GB 37.24 GB 10.11 GB /home/kelvin/ssd2/data/genome/00-common_all.vcf 9.42 GB 666.59 MB 666.59 MB 6.58 GB 1.5 GB A distributed file is a file that has been split into multiple pieces and sent to other nodes. It's managed by applyCl.balanceFile()
Now let's see the output of some commands to analyze distributed files and folders shall we?
In this model, you have a lot (1000-10000) of small files (10MB each) that you want to analyze over quickly. So, what you can do is first download all files to 1 node, then run applyCl.balanceFolder()
to move a bunch of files to other nodes. The total size on disk for all nodes should be proportional to the cpu count/performance of each node, so that more powerful nodes can process more data. In this example, the folder has already been balanced and we're just trying to analyze those files. Refer back to the diskScan interface above for the exact setup.
from k1lib.imports import *
2023-05-24 16:11:13,341 INFO worker.py:1364 -- Connecting to existing Ray cluster at address: 192.168.1.35:6379...
2023-05-24 16:11:13,346 INFO worker.py:1544 -- Connected to Ray cluster. View the dashboard at 127.0.0.1:8265
base = "~/ssd2/data/genome/RegulationFeatureActivity"
fns = None | applyCl.aS(lambda: ls(base) | head(3)) | deref(); fns
[['2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c', ['/home/kelvin/ssd2/data/genome/RegulationFeatureActivity/homo_sapiens.GRCh38.mononuclear_PB.Regulatory_Build.regulatory_activity.20221007.gff', '/home/kelvin/ssd2/data/genome/RegulationFeatureActivity/homo_sapiens.GRCh38.naive_B_VB.Regulatory_Build.regulatory_activity.20221007.gff', '/home/kelvin/ssd2/data/genome/RegulationFeatureActivity/homo_sapiens.GRCh38.leg_muscle.Regulatory_Build.regulatory_activity.20221007.gff']], ['901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599', ['/home/kelvin/ssd2/data/genome/RegulationFeatureActivity/homo_sapiens.GRCh38.HUES64.Regulatory_Build.regulatory_activity.20221007.gff', '/home/kelvin/ssd2/data/genome/RegulationFeatureActivity/homo_sapiens.GRCh38.GM12878.Regulatory_Build.regulatory_activity.20221007.gff', '/home/kelvin/ssd2/data/genome/RegulationFeatureActivity/homo_sapiens.GRCh38.CMP_CD4_2.Regulatory_Build.regulatory_activity.20221007.gff']], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', ['/home/kelvin/ssd2/data/genome/RegulationFeatureActivity/homo_sapiens.GRCh38.thymus_2.Regulatory_Build.regulatory_activity.20221007.gff', '/home/kelvin/ssd2/data/genome/RegulationFeatureActivity/homo_sapiens.GRCh38.thymus_1.Regulatory_Build.regulatory_activity.20221007.gff', '/home/kelvin/ssd2/data/genome/RegulationFeatureActivity/homo_sapiens.GRCh38.HUES6.Regulatory_Build.regulatory_activity.20221007.gff']], ['82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9', ['/home/kelvin/ssd2/data/genome/RegulationFeatureActivity/homo_sapiens.GRCh38.T_PB.Regulatory_Build.regulatory_activity.20221007.gff', '/home/kelvin/ssd2/data/genome/RegulationFeatureActivity/homo_sapiens.GRCh38.bipolar_neuron.Regulatory_Build.regulatory_activity.20221007.gff', '/home/kelvin/ssd2/data/genome/RegulationFeatureActivity/homo_sapiens.GRCh38.UCSF_4.Regulatory_Build.regulatory_activity.20221007.gff']]]
We just try to list the top 3 files in the folder ~/ssd2/data/genome/RegulationFeatureActivity
out. How many files does each node has in total?
None | applyCl.aS(lambda: ls(base) | shape(0)) | deref()
[['2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c', 9], ['901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599', 9], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', 79], ['82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9', 21]]
118 files total, which is quite a lot. Then we're going to transform it to prepare to feed into applyCl()
:
fns2 = fns | ungroup() | deref(); fns2 | head(5)
[['2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c', '/home/kelvin/ssd2/data/genome/RegulationFeatureActivity/homo_sapiens.GRCh38.mononuclear_PB.Regulatory_Build.regulatory_activity.20221007.gff'], ['2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c', '/home/kelvin/ssd2/data/genome/RegulationFeatureActivity/homo_sapiens.GRCh38.naive_B_VB.Regulatory_Build.regulatory_activity.20221007.gff'], ['2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c', '/home/kelvin/ssd2/data/genome/RegulationFeatureActivity/homo_sapiens.GRCh38.leg_muscle.Regulatory_Build.regulatory_activity.20221007.gff'], ['901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599', '/home/kelvin/ssd2/data/genome/RegulationFeatureActivity/homo_sapiens.GRCh38.HUES64.Regulatory_Build.regulatory_activity.20221007.gff'], ['901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599', '/home/kelvin/ssd2/data/genome/RegulationFeatureActivity/homo_sapiens.GRCh38.GM12878.Regulatory_Build.regulatory_activity.20221007.gff']]
Then, let's read all of the files, grabbing the 51th line and outputting them:
fns2 | head(5) | applyCl(lambda path: cat(path) | ~head(50) | item(), pre=True) | deref()
[['2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c', '20\tRegulatory_Build\tCTCF_binding_site\t54091201\t54091400\t.\t.\t.\tactivity=NA;bound_end=54091400;bound_start=54091201;description=CTCF Binding Site na in mononuclear (PB);epigenome=mononuclear (PB);feature_type=CTCF Binding Site;regulatory_feature_stable_id=ENSR00000654921'], ['2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c', '20\tRegulatory_Build\tCTCF_binding_site\t54091201\t54091400\t.\t.\t.\tactivity=NA;bound_end=54091400;bound_start=54091201;description=CTCF Binding Site na in naive B (VB);epigenome=naive B (VB);feature_type=CTCF Binding Site;regulatory_feature_stable_id=ENSR00000654921'], ['2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c', '20\tRegulatory_Build\tCTCF_binding_site\t54091201\t54091400\t.\t.\t.\tactivity=NA;bound_end=54091400;bound_start=54091201;description=CTCF Binding Site na in leg muscle;epigenome=leg muscle;feature_type=CTCF Binding Site;regulatory_feature_stable_id=ENSR00000654921'], ['901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599', '20\tRegulatory_Build\tCTCF_binding_site\t54091201\t54091400\t.\t.\t.\tactivity=NA;bound_end=54091400;bound_start=54091201;description=CTCF Binding Site na in HUES64;epigenome=HUES64;feature_type=CTCF Binding Site;regulatory_feature_stable_id=ENSR00000654921'], ['901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599', '20\tRegulatory_Build\tCTCF_binding_site\t54091201\t54091400\t.\t.\t.\tactivity=ACTIVE;bound_end=54091400;bound_start=54091201;description=CTCF Binding Site active in GM12878;epigenome=GM12878;feature_type=CTCF Binding Site;regulatory_feature_stable_id=ENSR00000654921']]
It's a big ugly to see all of the node ids. Cleaning them up:
fns2 | applyCl(lambda path: cat(path) | ~head(50) | item(), pre=True) | cut(1) | deref()
['20\tRegulatory_Build\tCTCF_binding_site\t54091201\t54091400\t.\t.\t.\tactivity=NA;bound_end=54091400;bound_start=54091201;description=CTCF Binding Site na in mononuclear (PB);epigenome=mononuclear (PB);feature_type=CTCF Binding Site;regulatory_feature_stable_id=ENSR00000654921', '20\tRegulatory_Build\tCTCF_binding_site\t54091201\t54091400\t.\t.\t.\tactivity=NA;bound_end=54091400;bound_start=54091201;description=CTCF Binding Site na in naive B (VB);epigenome=naive B (VB);feature_type=CTCF Binding Site;regulatory_feature_stable_id=ENSR00000654921', '20\tRegulatory_Build\tCTCF_binding_site\t54091201\t54091400\t.\t.\t.\tactivity=NA;bound_end=54091400;bound_start=54091201;description=CTCF Binding Site na in leg muscle;epigenome=leg muscle;feature_type=CTCF Binding Site;regulatory_feature_stable_id=ENSR00000654921', '20\tRegulatory_Build\tCTCF_binding_site\t54091201\t54091400\t.\t.\t.\tactivity=NA;bound_end=54091400;bound_start=54091201;description=CTCF Binding Site na in HUES64;epigenome=HUES64;feature_type=CTCF Binding Site;regulatory_feature_stable_id=ENSR00000654921', '20\tRegulatory_Build\tCTCF_binding_site\t54091201\t54091400\t.\t.\t.\tactivity=ACTIVE;bound_end=54091400;bound_start=54091201;description=CTCF Binding Site active in GM12878;epigenome=GM12878;feature_type=CTCF Binding Site;regulatory_feature_stable_id=ENSR00000654921', '20\tRegulatory_Build\tCTCF_binding_site\t54091201\t54091400\t.\t.\t.\tactivity=NA;bound_end=54091400;bound_start=54091201;description=CTCF Binding Site na in CMP CD4+_2;epigenome=CMP CD4+_2;feature_type=CTCF Binding Site;regulatory_feature_stable_id=ENSR00000654921', '20\tRegulatory_Build\tCTCF_binding_site\t54091201\t54091400\t.\t.\t.\tactivity=NA;bound_end=54091400;bound_start=54091201;description=CTCF Binding Site na in thymus_2;epigenome=thymus_2;feature_type=CTCF Binding Site;regulatory_feature_stable_id=ENSR00000654921', '20\tRegulatory_Build\tCTCF_binding_site\t54091201\t54091400\t.\t.\t.\tactivity=NA;bound_end=54091400;bound_start=54091201;description=CTCF Binding Site na in thymus_1;epigenome=thymus_1;feature_type=CTCF Binding Site;regulatory_feature_stable_id=ENSR00000654921', '20\tRegulatory_Build\tCTCF_binding_site\t54091201\t54091400\t.\t.\t.\tactivity=NA;bound_end=54091400;bound_start=54091201;description=CTCF Binding Site na in HUES6;epigenome=HUES6;feature_type=CTCF Binding Site;regulatory_feature_stable_id=ENSR00000654921', '20\tRegulatory_Build\tCTCF_binding_site\t54091201\t54091400\t.\t.\t.\tactivity=NA;bound_end=54091400;bound_start=54091201;description=CTCF Binding Site na in T (PB);epigenome=T (PB);feature_type=CTCF Binding Site;regulatory_feature_stable_id=ENSR00000654921', '20\tRegulatory_Build\tCTCF_binding_site\t54091201\t54091400\t.\t.\t.\tactivity=INACTIVE;bound_end=54091400;bound_start=54091201;description=CTCF Binding Site inactive in bipolar neuron;epigenome=bipolar neuron;feature_type=CTCF Binding Site;regulatory_feature_stable_id=ENSR00000654921', '20\tRegulatory_Build\tCTCF_binding_site\t54091201\t54091400\t.\t.\t.\tactivity=NA;bound_end=54091400;bound_start=54091201;description=CTCF Binding Site na in UCSF-4;epigenome=UCSF-4;feature_type=CTCF Binding Site;regulatory_feature_stable_id=ENSR00000654921']
Now that you understood the basics, let's try to grab the histogram of the 2nd column (0-indexed):
%%time
None | applyCl.aS(lambda: ls(base)) | ungroup() | applyCl(cat() | table() | cut(2) | count(), pre=True) | cut(1) | count.join() | ~sort() | deref()
CPU times: user 74.3 ms, sys: 22.8 ms, total: 97.1 ms Wall time: 1.74 s
[[31680994, 'enhancer', '49%'], [13053514, 'open_chromatin_region', '20%'], [12004612, 'CTCF_binding_site', '19%'], [4318446, 'promoter', '7%'], [3643014, 'TF_binding_site', '6%']]
Now would you look at that! 118 files, totalling around 16GB, processed in just 1.74 seconds. So 9.2GB/s. That's pretty insane don't you think? For comparison, let's see the speed of a single thread:
%%time
ls(base) | randomize(None) | head(20) | (apply(os.path.getsize) | toSum() | op()/1e9) & (apply(cat() | table() | cut(2) | count()) | count.join() | iden() & (cut(0) | toSum())) | deref()
CPU times: user 9.6 s, sys: 315 ms, total: 9.92 s Wall time: 9.94 s
[2.711626156, [[[5369660, 'enhancer', '49%'], [617460, 'TF_binding_site', '6%'], [2034680, 'CTCF_binding_site', '19%'], [2212460, 'open_chromatin_region', '20%'], [731940, 'promoter', '7%']], 10966200]]
2.7GB file processed in 10s. That works out to 270MB/s. The applyCl version is around 34x faster, so that's pretty dope. Just for fun, how about raw read performance?
%%time
None | applyCl.aS(lambda: ls(base)) | ungroup() | applyCl(cat() | shape(0), pre=True) | cut(1) | toSum()
CPU times: user 41.5 ms, sys: 19.9 ms, total: 61.3 ms Wall time: 387 ms
64700580
Woah. 16GB in 387ms, or 41.3GB/s. Quite insane.
In this model, you have 1 giant (text) file (around 10-200GB) that you want to analyze over quickly. So, what you can do is first download the file to 1 node, then run applyCl.balanceFile()
to split the file into multiple chunks and moving those chunks to the same file on other nodes. The total size on disk for all nodes should be proportional to the cpu count/performance of each node, so that more powerful nodes can process more data. In this example, the file has already been balanced and we're just trying to analyze those files. Refer back to the diskScan interface above for the exact setup.
from k1lib.imports import *
2023-05-24 14:22:51,520 INFO worker.py:1364 -- Connecting to existing Ray cluster at address: 192.168.1.35:6379...
2023-05-24 14:22:51,524 INFO worker.py:1544 -- Connected to Ray cluster. View the dashboard at 127.0.0.1:8265
fn = "/home/kelvin/ssd2/data/genome/MotifFeatures/homo_sapiens.GRCh38.motif_features.gff"; cat(fn) | headOut(3)
1 . TF_binding_site 10001 10016 0.531598334592 + . binding_matrix_stable_id=ENSPFM0328;stable_id=ENSM01052001449;transcription_factor_complex=HOXB2::RFX5 1 . TF_binding_site 10001 10018 1.21061634576 - . binding_matrix_stable_id=ENSPFM0542;stable_id=ENSM00208374688;transcription_factor_complex=TEAD4::ESRRB 1 . TF_binding_site 10002 10016 -7.62367795095 - . binding_matrix_stable_id=ENSPFM0220;stable_id=ENSM01051801340;transcription_factor_complex=GCM1::ELF1%2CGCM2::ELK1%2CGCM1::ETV1%2CGCM1::ETV4%2CGCM1::ELK3%2CGCM1::ERG
Now this file is serious. 56GB total across 4 nodes. How many lines does this file have in total?
%%time
applyCl.cat(fn, shape(0)) | toSum()
CPU times: user 50 ms, sys: 2.69 ms, total: 52.7 ms Wall time: 1.27 s
330097444
300 million lines. That's a lot. Also notice our running time is 1.27s, which means a processing speed of 44GB/s, quite insane, just like raw read performance of distributed folder. Most likely this is due to Linux caching parts of the file in memory, as I only have 2 NVMe and 2 SATA3 drives. Let's again grab the 0th column, which tells us what chromosome does this motif/line belongs to:
%%time
applyCl.cat(fn, table() | cut(0) | toInt() | count()) | count.join() | sort(1) | permute(1, 0) | transpose() | ~aS(plt.bar)
plt.xlabel("Chromosome"); plt.ylabel("#motifs");
CPU times: user 95 ms, sys: 18.6 ms, total: 114 ms Wall time: 10.3 s
56GB processed in 10.3s for a processing speed of 5.44GB/s. Doing it on 1 thread:
%%time
cat(fn) | head(1_000_000) | table() | cut(0) | toInt() | count()
CPU times: user 1.28 s, sys: 4.87 ms, total: 1.29 s Wall time: 1.29 s
[[1000000, 1, '100%']]
cat(fn) | head(1_000_000) | shape(0).all() | toSum() | op()/1e6
165.425528
165MB processed in 1.29s for a processing speed of 127MB/s. This is 43x slower than the applyCl version, which matches our observation above nicely.
Imagine this situation. You have a distributed file. The files themselves are relatively raw, meaning you need to transform it into a more usable form inside of Python, and this transformation takes 30s. But this intermediate format, the transformed version, is pretty useful and you want to do a lot of further analysis with it. It'd be pretty annoying to have to run the transformation from the raw files again and again whenever you want to run a new analysis, so instead of doing that, you can store them in memory on other nodes and just keep them there. Then, you can run the analysis distributedly as well, on those distributed in-memory objects. Let's see an example:
from k1lib.imports import *
parseFile = table() | cut(-1) | apply(op().split(";") | op().split("=").all() | toDict())
fn = "/home/kelvin/ssd2/data/genome/MotifFeatures/homo_sapiens.GRCh38.motif_features.gff"; cat(fn) | headOut(3)
2023-05-24 15:36:58,192 INFO worker.py:1364 -- Connecting to existing Ray cluster at address: 192.168.1.35:6379...
2023-05-24 15:36:58,196 INFO worker.py:1544 -- Connected to Ray cluster. View the dashboard at 127.0.0.1:8265
1 . TF_binding_site 10001 10016 0.531598334592 + . binding_matrix_stable_id=ENSPFM0328;stable_id=ENSM01052001449;transcription_factor_complex=HOXB2::RFX5 1 . TF_binding_site 10001 10018 1.21061634576 - . binding_matrix_stable_id=ENSPFM0542;stable_id=ENSM00208374688;transcription_factor_complex=TEAD4::ESRRB 1 . TF_binding_site 10002 10016 -7.62367795095 - . binding_matrix_stable_id=ENSPFM0220;stable_id=ENSM01051801340;transcription_factor_complex=GCM1::ELF1%2CGCM2::ELK1%2CGCM1::ETV1%2CGCM1::ETV4%2CGCM1::ELK3%2CGCM1::ERG
Let's grab the last column, transform it into a dict, and storing it distributedly:
%%time
a = applyCl.cat(fn, parseFile | aS(list), resolve=False, pre=True, multiplier=10) | deref()
CPU times: user 2.51 s, sys: 599 ms, total: 3.11 s Wall time: 2min 51s
That took longer than I'd like. This is what it looks like:
a | head(3), a | shape()
([['2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c', ObjectRef(0585763f128d2ab3ffffffffffffffffffffffff4500000001000000)], ['2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c', ObjectRef(89e9d7a9e6eaa4faffffffffffffffffffffffff4500000001000000)], ['2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c', ObjectRef(c158ab31c0b3f29fffffffffffffffffffffffff4500000001000000)]], (640, 2, 56))
Just a bunch of ray object reference that you can pass around later on.
%%time
a | applyCl(lambda x: x | shape(0), pre=True) | cut(1) | toSum()
CPU times: user 631 ms, sys: 195 ms, total: 827 ms Wall time: 9.11 s
330097384
Yep. All data is there. Note how it still takes a while to get all the lines, while reading straight from disk can be faster:
%%time
applyCl.cat(fn, shape(0)) | toSum()
CPU times: user 50.6 ms, sys: 9.99 ms, total: 60.5 ms Wall time: 1.59 s
330097444
Let's check out the execution stage alone and not the creating a list and storing it phase:
%%time
applyCl.cat(fn, parseFile | shape(0), pre=True, multiplier=10) | cut(1) | toSum()
CPU times: user 747 ms, sys: 130 ms, total: 877 ms Wall time: 25.8 s
330097384
Running the analysis without storing is around 6.6x faster. Memory allocation takes a whole lot of time! Let's now try to do analysis on both the raw files and on the structured stored in RAM distributedly.
%%time
a | applyCl(op().keys().all() | joinStreams() | count(), pre=True) | cut(1) | count.join() | deref()
CPU times: user 852 ms, sys: 173 ms, total: 1.02 s Wall time: 13.9 s
[[330097384, 'binding_matrix_stable_id', '33%'], [330097384, 'stable_id', '33%'], [330097384, 'transcription_factor_complex', '33%'], [5401733, 'epigenomes_with_experimental_evidence', '1%']]
%%time
applyCl.cat(fn, parseFile | op().keys().all() | joinStreams() | count()) | count.join() | deref()
CPU times: user 504 ms, sys: 19 ms, total: 523 ms Wall time: 34.2 s
[[330097384, 'binding_matrix_stable_id', '33%'], [330097384, 'stable_id', '33%'], [330097384, 'transcription_factor_complex', '33%'], [5401733, 'epigenomes_with_experimental_evidence', '1%']]
2.5 faster if stored in RAM!
%%time
a | applyCl(apply(op()["transcription_factor_complex"].split("%2C") | apply(op().split("::"))) | joinStreams(2) | count(), pre=True) | cut(1) | count.join() | ~sort() | shape() & display() | deref()
44397093 TEAD4 4% 34397157 ETV2 3% 29245345 ELK1 3% 27354052 HOXB2 3% 26873920 GCM1 3% 24330575 TBX21 2% 24329900 HOXB13 2% 21495614 EOMES 2% 18788514 FOXI1 2% 16107691 ERF 2% CPU times: user 1.03 s, sys: 209 ms, total: 1.24 s Wall time: 23.7 s
[[409, 3], None]
%%time
applyCl.cat(fn, parseFile | apply(op()["transcription_factor_complex"].split("%2C") | apply(op().split("::"))) | joinStreams(2) | count()) | count.join() | ~sort() | shape() & display() | deref()
44397093 TEAD4 4% 34397157 ETV2 3% 29245345 ELK1 3% 27354052 HOXB2 3% 26873920 GCM1 3% 24330575 TBX21 2% 24329900 HOXB13 2% 21495614 EOMES 2% 18788514 FOXI1 2% 16107691 ERF 2% CPU times: user 517 ms, sys: 66.1 ms, total: 583 ms Wall time: 45.4 s
[[409, 3], None]
Around 1.9x faster if stored in RAM.
So, is it worth it? Could be. The reason why the code that stores data in RAM doesn't run that fast is because it has to allocate lots of memory, which takes a lot of time. Storing data structures in RAM also means that you're thrashing the cache a lot, which will slow things down. We're storing lots of data because we don't quite cut down the data from disk a lot. But if you have a use case that cuts down data from disk a lot, then I'd expect it to be super fast.
from k1lib.imports import *
fn = "/home/kelvin/ssd2/data/genome/MotifFeatures/homo_sapiens.GRCh38.motif_features.gff"
2023-05-19 22:05:08,199 INFO worker.py:1364 -- Connecting to existing Ray cluster at address: 192.168.1.35:6379...
2023-05-19 22:05:08,214 INFO worker.py:1544 -- Connected to Ray cluster. View the dashboard at 127.0.0.1:8265
%%time
cat(fn) | head(300_000_000) | shape(0)
CPU times: user 32.5 s, sys: 6.33 s, total: 38.8 s Wall time: 40 s
164842116
os.path.getsize(fn)/1e9
27.927709895
28GB in 40s, works out to be 700MB/s, pretty high.
%%time
fn | splitSeek(2) | window(2) | ~applyMp(lambda sB, eB: cat(fn, sB=sB, eB=eB) | shape(0), timeout=120) | toSum()
CPU times: user 300 ms, sys: 94.1 ms, total: 394 ms Wall time: 13.9 s
164842117
28GB in 14s, or 2GB/s.
%%time
fn | splitSeek(4) | window(2) | ~applyMp(lambda sB, eB: cat(fn, sB=sB, eB=eB) | shape(0), timeout=120) | toSum()
CPU times: user 278 ms, sys: 93.1 ms, total: 372 ms Wall time: 7.19 s
164842119
28GB in 7s, or 4GB/s.
%%time
fn | splitSeek(8) | window(2) | ~applyMp(lambda sB, eB: cat(fn, sB=sB, eB=eB) | shape(0), timeout=120) | toSum()
CPU times: user 266 ms, sys: 97.9 ms, total: 363 ms Wall time: 4.25 s
164842123
28GB in 4.25s, or 6.6GB/s
%%time
fn | splitSeek(16) | window(2) | ~applyMp(lambda sB, eB: cat(fn, sB=sB, eB=eB) | shape(0), timeout=120) | toSum()
CPU times: user 261 ms, sys: 103 ms, total: 364 ms Wall time: 4.52 s
164842131
28GB in 4.52s, or 6.2GB/s. Going from 8 to 16 cores doesn't actually make it faster. Interesting.
%%time
applyCl.nodeIds() | applyCl.aS(lambda: cat(fn) | shape(0), timeout=120) | cut(1) | toSum()
CPU times: user 57 ms, sys: 27 ms, total: 84.1 ms Wall time: 33.5 s
330097385
56GB in 33.5s, or 1.67GB/s
%%time
applyCl.cat(fn, shape(0), timeout=120) | toSum()
CPU times: user 39 ms, sys: 7.09 ms, total: 46.1 ms Wall time: 6.5 s
330097414
56GB in 6.5s, or 8.6GB/s. Just a bit faster than 16 core applyMp. Results kinda agrees with the previous section, although I'm expecting more though. Could it be because 1 core has 2 threads, and I've been expecting these runs to have as many cores as threads? How about let's make it a bit more compute-heavy instead of just looping through all lines as fast as possible?
cat(fn) | headOut()
1 . TF_binding_site 10001 10016 0.531598334592 + . binding_matrix_stable_id=ENSPFM0328;stable_id=ENSM01052001449;transcription_factor_complex=HOXB2::RFX5 1 . TF_binding_site 10001 10018 1.21061634576 - . binding_matrix_stable_id=ENSPFM0542;stable_id=ENSM00208374688;transcription_factor_complex=TEAD4::ESRRB 1 . TF_binding_site 10002 10016 -7.62367795095 - . binding_matrix_stable_id=ENSPFM0220;stable_id=ENSM01051801340;transcription_factor_complex=GCM1::ELF1%2CGCM2::ELK1%2CGCM1::ETV1%2CGCM1::ETV4%2CGCM1::ELK3%2CGCM1::ERG 1 . TF_binding_site 10005 10026 -1.68578313067 - . binding_matrix_stable_id=ENSPFM0305;stable_id=ENSM01051839556;transcription_factor_complex=HOXB2::ELF1 1 . TF_binding_site 10096 10111 -2.19311089986 + . binding_matrix_stable_id=ENSPFM0167;stable_id=ENSM00207949539;transcription_factor_complex=ETV2::SREBF2 1 . TF_binding_site 10097 10111 1.71712580953 + . binding_matrix_stable_id=ENSPFM0327;stable_id=ENSM00207314130;transcription_factor_complex=HOXB2::RFX5 1 . TF_binding_site 10099 10118 2.729058868 + . binding_matrix_stable_id=ENSPFM0326;stable_id=ENSM01051822885;transcription_factor_complex=HOXB2::RFX5 1 . TF_binding_site 10100 10112 -25.7219096287 + . binding_matrix_stable_id=ENSPFM0014;stable_id=ENSM00524980244;transcription_factor_complex=ATF4::CEBPB%2CATF4::CEBPD%2CATF4::TEF%2CATF4%2CCEBPG::ATF4 1 . TF_binding_site 10101 10112 -1.35283704878 + . binding_matrix_stable_id=ENSPFM0507;stable_id=ENSM00207927997;transcription_factor_complex=SREBF2 1 . TF_binding_site 10101 10112 -6.42154097826 + . binding_matrix_stable_id=ENSPFM0027;stable_id=ENSM00522822927;transcription_factor_complex=CEBPG::CREB3L1
For the processing portion, let's grab the 3rd and 4th columns, turn them into integers and find the difference between them.
process = lambda bins=30: table() | cut(3, 4) | toInt(0, 1) | ~apply(lambda x,y: y-x) | hist(bins)
%%time
cat(fn) | head(10_000_000) | process() | shape()
CPU times: user 29.2 s, sys: 276 ms, total: 29.5 s Wall time: 29.7 s
(3, 30)
cat(fn) | head(10_000_000) | shape(0).all() | toSum() | op()/1e9
1.672442788
1.67GB in 29.7s, or 56MB/s
%%time
fn | splitSeek(1) | window(2) | ~applyMp(lambda sB, eB: cat(fn, sB=sB, eB=eB) | process(300), timeout=1200) | hist.join() | shape()
CPU times: user 2.11 s, sys: 557 ms, total: 2.66 s Wall time: 7min 33s
(3, 30)
os.path.getsize(fn)/1e9
27.927709895
28GB in 453s, or 62MB/s. Actually pretty close to regular single-threaded performance.
%%time
fn | splitSeek(2) | window(2) | ~applyMp(lambda sB, eB: cat(fn, sB=sB, eB=eB) | process(300), timeout=600) | hist.join() | shape()
CPU times: user 1.11 s, sys: 314 ms, total: 1.42 s Wall time: 3min 53s
(3, 30)
28GB in 233s, or 120MB/s
%%time
fn | splitSeek(4) | window(2) | ~applyMp(lambda sB, eB: cat(fn, sB=sB, eB=eB) | process(300), timeout=600) | hist.join() | shape()
CPU times: user 707 ms, sys: 244 ms, total: 951 ms Wall time: 1min 59s
(3, 30)
28GB in 119s, or 235MB/s
%%time
fn | splitSeek(8) | window(2) | ~applyMp(lambda sB, eB: cat(fn, sB=sB, eB=eB) | process(300), timeout=600) | hist.join() | shape()
CPU times: user 632 ms, sys: 166 ms, total: 798 ms Wall time: 1min 16s
(3, 30)
28GB in 76s, or 368MB/s. Only 1.57x that of 4-threads applyMp.
%%time
fn | splitSeek(16) | window(2) | ~applyMp(lambda sB, eB: cat(fn, sB=sB, eB=eB) | process(300), timeout=600) | hist.join() | shape()
CPU times: user 824 ms, sys: 263 ms, total: 1.09 s Wall time: 1min 24s
(3, 30)
28GB in 84s, or 333MB/s, worse than 8-threads applyMp!
%%time
applyCl.cat(fn, process(300), timeout=300) | hist.join() | shape()
CPU times: user 332 ms, sys: 83.4 ms, total: 415 ms Wall time: 1min 7s
(3, 30)
56GB in 74s, or 757MB/s. 2.3x that of 16-threads applyMp, 2.05x that of 8-threads applyMp and 13.5x that of single-threaded. Let's plot things out:
plt.plot([1, 1, 2, 4, 8, 16, 32], [56, 62, 120, 235, 368, 333, 757], "o-");
plt.xscale("log"); plt.yscale("log"); plt.grid(True); plt.xlabel("Threads"); plt.grid(True); plt.ylabel("Throughput (MB/s)");
So what's going on here? Why is there a kink in the middle? Btw, x and y axis are in log scale, because we're stepping up the number of cores exponentially over time, and the throughput also increases exponentially. But, if we were to plot the data versus the number of cores, not threads, then this happens:
# thumbnail
plt.plot([1, 1, 2, 4, 8, 16], [56, 62, 120, 235, 368, 757], "o-");
plt.xscale("log"); plt.yscale("log"); plt.grid(True); plt.xlabel("Cores"); plt.grid(True); plt.ylabel("Throughput (MB/s)");
That looks pretty damn linear. So, it's not that there's performance degradation between 8-threads and 16-threads, but because fundamentally, there're only 8 cores, so both 8-threads and 16-threads tests use all 8 cores. Beside that, this surprisingly straight line gives me a lot of hope that all of these architectures can scale immensely.
Content below was written before content above, and plays out some examples in the main docs. You don't strictly have to read these if you understood previous parts already.
Let's see the params and options you get first:
def getSig(o): return f"{o.__name__}{inspect.signature(o)}"
getSig(applyCl.__init__) | aS(print)
applyCl.__init__.__doc__.split("\n") | grep("param", after=1e9) | stdout()
__init__(self, f, prefetch=None, timeout=60, bs=1, rss: Union[dict, str] = {}, pre: bool = False, orPatch=True, num_cpus=1, resolve=True, **kwargs) :param prefetch: if not specified, schedules all jobs at the same time. If specified, schedules jobs so that there'll only be a specified amount of jobs, and will only schedule more if results are actually being used. :param timeout: seconds to wait for job before raising an error :param bs: if specified, groups ``bs`` number of transforms into 1 job to be more efficient. :param rss: resources required for the task. Can be {"custom_resource1": 2} or "custom_resource1" as a shortcut :param pre: "preserve", same convention as :meth:`applyCl.aS`. If True, then allow passing through node ids as the first column to shedule jobs on those specific nodes only :param orPatch: whether to automatically patch __or__ function so that cli tools can work with numpy arrays on that remote worker :param num_cpus: how many cpu does each task take? :param resolve: whether to resolve the outputs or not. Set this to False to not move memory to the requesting node and cache the big data structure on the remote node :param kwargs: extra arguments to be passed to the function. ``args`` not included as there're a couple of options you can pass for this cli.
A lot of unfamiliar options compared to applyMp. There are also more helper functionalities of applyCl:
dir(applyCl) | ~filt(op().startswith("_")) | apply(lambda x: [x, getattr(applyCl, x).__doc__]) | filt(op(), 1) | ~inSet(["f", "hint", "all"], 0) | apply(aS(lambda x: "applyCl." + getSig(getattr(applyCl, x))) | aS(fmt.h, 3), 0) | apply(fmt.pre, 1) | join("\n").all() | aS(viz.Carousel)
Executes function f once for all node ids that are piped in. Example:: # returns [['1051da...', ['Desktop', 'Downloads']], ['7bb387...', ['Pictures', 'Music']]] applyCl.nodeIds() | applyCl.aS(lambda: None | cmd("ls ~") | deref()) | deref() # also returns [['1051da...', ['Desktop', 'Downloads']], ['7bb387...', ['Pictures', 'Music']]] None | applyCl.aS(lambda: None | cmd("ls ~") | deref()) | deref() If you want to execute f for all nodes, you can pass in None instead. As a reminder, this kinda follows the same logic as the popular cli :class:`aS`, where f is executed once, hence the name "apply Single". Here, the meaning of "single" is different. It just means execute once for each node ids. :param f: main function to execute in each node. Not supposed to accept any arguments :param timeout: seconds to wait for job before raising an error :param resolve: whether to resolve the result or not. See main docs at :class:`applyCl`
Splits a specified file in node nAs and dumps other parts to nodes nBs. Example:: applyCl.balanceFile("~/cron.log") This will split the big files up into multiple segments (1 for each node). Then for each segment, it will read through it chunk by chunk into memory, and then deposits it into the respective nodes. Finally, it truncates the original files down to its segment boundary. The main goal of this is so that you can analyze a single big (say 200GB) file quickly. If that file is on a single node, then it will take forever, even with :class:`applyMp`. So splitting things up on multiple nodes will make analyzing it a lot faster. There's also the function :meth:`balanceFolder`, which has the opposite problem of having lots of small (say 100MB) files. So it will try to move files around (keeping them intact in the meantime) to different nodes so that the folder size ratio is roughly proportional to the cpu count. The exact split rule depends on the number of CPUs of each node. Best to see an example:: Command: applyCl.balanceFile("~/cron.log") Verbose command: applyCl.balanceFile("~/cron.log", ["1"], ["1", "2", "3", "4", "5"]) ----------- Before ----------- Node: 1 2 3 4 5 Cpu: 8 12 16 8 8 Size (GB): 52 0 0 0 0 ----------- After ----------- Node: 1 2 3 4 5 Cpu: 8 12 16 8 8 Size (GB): 8 12 16 8 8 This also works if you have files on existing nodes already, and are upgrading the cluster:: Command: applyCl.balanceFile("~/cron.log") Verbose command: applyCl.balanceFile("~/cron.log", ["1", "5"], ["1", "2", "3", "4", "5"]) ----------- Before ----------- Node: 1 2 3 4 5 Cpu: 8 12 16 8 8 Size (GB): 26 0 0 26 0 ----------- After ----------- Node: 1 2 3 4 5 Cpu: 8 12 16 8 8 Size (GB): 8 12 16 8 8 If you want to move files out of a node when decommissioning them, you can do something like this:: Command: applyCl.decommission("~/cron.log", ["3", "4"]) Verbose command: applyCl.balanceFile("~/cron.log", ["1", "2", "3", "4", "5"], ["1", "2", "5"]) ----------- Before ----------- Node: 1 2 3 4 5 Cpu: 8 12 16 8 8 Size (GB): 8 12 16 8 8 ----------- After ----------- Node: 1 2 3 4 5 Cpu: 8 12 16 8 8 Size (GB): 15 22 0 0 15 Remember that the node ids "1", etc. is for illustrative purposes only. You should get real node ids from :meth:`nodeIds`. Why is the file size proportional to the number of cores on each node? Well, if you have more cores, you should be able to process more, so as to make everything balanced, right? Again, this means that you can split arbitrarily large files as long as you have the disk space for it, ram size is not a concern. How does this perform? Not the best in the world if you don't have a lot of nodes. With sata 3 ssds, 750MB/s ethernet, I got transfer speeds of roughly 100MB/s. This should increase as you have more nodes based on the code structure, but I haven't tested it yet. Can it be faster? Definitely. Am I willing to spend time optimizing it? No. :param fn: file name :param nAs: node ids that currently stores the file. If not specified, try to detect what nodes the file exists in :param nBs: node ids that will store the file after balancing everything out. If not specified, will take all available nodes :param rS: :class:`~k1lib.cli.inp.refineSeek` instance, if you need more fine-grained control over section boundaries so as to not make everything corrupted
Balances all files within a folder across all nodes. Example:: base = "~/repos/labs/k1lib/k1lib/cli/test/applyCl.balance" # deletes old structures and making test folder applyCl.cmd(f"rm -r {base}"); applyCl.cmd(f"mkdir -p {base}") # creates 20 files of different sizes and dump it in the base folder of the current node torch.linspace(1e4, 1e5, 20).int() | apply(lambda x: "x"*x) | insertIdColumn() | ~apply(lambda idx, contents: contents | file(f"{base}/{idx}.txt")) | deref(); # transfers files between nodes such that the total folder size is proportional to the number of cpus across nodes applyCl.balanceFolder(base) # get folder size of all nodes None | applyCl.aS(lambda: ls(base) | apply(os.path.getsize) | toSum()) | deref() # creates 20 additional files and dump it to the current node torch.linspace(1e4, 1e5, 20).int() | apply(lambda x: "x"*x) | insertIdColumn() | ~apply(lambda idx, contents: contents | file(f"{base}/{idx+20}.txt")) | deref(); # balances the tree out again applyCl.balance(base) # get folder size of all nodes None | applyCl.aS(lambda: ls(base) | apply(os.path.getsize) | toSum()) | deref() So imagine that you just downloaded 1000 files to a single node on a specific folder, but you need to analyze all of them in a distributed manner. What you can do is to move some files to other nodes and then do your analysis. If you want to download more files, just dump it to any node (or download distributed across all nodes), then rebalance the folders and do your analysis. :param folder: folder to rebalance all of the files :param maxSteps: what's the maximum number of file transfers? By default has no limit, so that files are transferred until :param audit: if True, don't actually move files around and just return what files are going to be moved where
Reads a file distributedly, does some operation on them, collects and returns all of the data together. Example:: fn = "~/repos/labs/k1lib/k1lib/cli/test/applyCl.cat.data" ("0123456789"*5 + "\n") * 1000 | file(fn) applyCl.splitFile(fn) applyCl.cat(fn, shape(0), keepNodeIds=True) | deref() That returns something like this (for a 2-node cluster, with 2 (node A) and 4 (node B) cpus respectively):: [['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', 167], ['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', 167], ['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 166], ['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 167], ['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 166], ['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 167]] Here, we're creating an initial file with 1000 lines. Then we'll split it up into 2 fragments: 334 lines and 667 lines and store them on the respective nodes. Then, on node A, we'll split the file up into 2 parts, each with 167 lines. On node B, we'll split the file up into 4 parts, each with around 166 lines. Then we'll schedule 6 processes total, each dealing with 166 lines. After all of that, results are collected together and returned. If you want to distinguish between different processes inside f, for example you want to write results into different files, you can do something like this:: dir_ = "~/repos/labs/k1lib/k1lib/cli/test" fn = f"{dir_}/applyCl.cat.data" applyCl.cmd(f"rm -r {dir_}/applyCl") # clear out old folders applyCl.cmd(f"mkdir -p {dir_}/applyCl") # creating folders # do processing on fn distributedly, then dump results into multiple files applyCl.cat(fn, ~aS(lambda idx, lines: lines | shape(0) | aS(dill.dumps) | file(f"{dir_}/applyCl/{idx}.pth")), includeId=True) | deref() # reading all files and summing them together None | applyCl.aS(lambda: ls(f"{dir_}/applyCl")) | ungroup() | applyCl(cat(text=False) | aS(dill.loads), pre=True) | cut(1) | toSum() .. admonition:: Simple mode There's also another mode that's activated whenever f is not specified that feels more like vanilla :class:`~inp.cat`. Say you have a file on a specific node:: nodeId = "7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d" fn = "~/ssd2/randomFile.txt" # -------------- file is on current node -------------- cat(fn) # returns iterator of lines inside the file fn | cat() # same thing as above # -------------- file is on remote node -------------- [nodeId, fn] | applyCl.cat() # returns iterator of lines of the file applyCl.cat([nodeId, fn]) # same thing nodeId | applyCl.cat(fn) # also same thing So yeah, there're lots of ways to just simply read a file on a remote node. Is it too much? Probably, but good thing is that you can pick any that's intuitive for you. Note that this mode is just for convenience only, for when you want to do exploratory analysis on a single remote file. To be efficient at bulk processing, use the normal mode instead. :param fn: file name :param f: function to execute in every process :param nodeIds: only read file from these nodes :param timeout: kills the processes if it takes longer than this amount of seconds :param pre: "preserve" mode, just like in :class:`applyCl`. Whether to keep the node id column or not :param multiplier: by default, each node will spawn as many process as there are cpus. Sometimes you want to spawn more process, change this to a higher number :param includeId: includes a unique id for this process (just normal integers from 0 to n) :param resolve: whether to resolve the remote objects or not
Convenience function to execute shell command on all nodes. Example:: applyCl.cmd("mkdir -p /some/folder") It returns [[nodeid1, output1], [nodeid2, output2]]. If you need more flexibility, fall back to :meth:`applyCl.aS` :param s: shell command to execute :param sudo: if True, will execute the command with sudo privileges. Will ask for password and then cache it internally for 5 minutes
Grabs the number of cpus available on this node
Convenience function for :meth:`balanceFile`. See docs over there.
Like :meth:`decommissionFile`, but works for distributed folders instead. :param nAs: list of node ids to migrate files away from :param maxSteps: limits the total number of optimization steps. Normally don't have to specify, but just here in case it runs for too long trying to optimize the folder structure :param audit: if True, just returns the file movements it's planning to do
Scans for files and folders in the specified folder for potential distributed files and folders. A distributed file is a file that exists on more than 1 node. A distributed folder is a folder that that exists on more than 1 node and does not have any shared children. Example:: applyCl.diskScan("~/ssd2") applyCl.diskScan("~/ssd2", True) The first line does not return anything, but will print out something like this: .. include:: ../literals/diskScan.rst While the second line will return a parseable data structure instead:: [[['/home/kelvin/ssd2/data/genome/RegulationFeatureActivity', [4113489746, 7912834090, 4164314316]], ['/home/kelvin/ssd2/data/genome/go/release_geneontology_org', [2071645117, 4172737915, 2107005131]], ['/home/kelvin/ssd2/data/genome/RegulationFeatureActivity.backup', [568878496, 552888466, 600610083]], ['/home/kelvin/ssd2/data/genome/00-common_all.idx', [341738564, 671136833, 0]], ['/home/kelvin/ssd2/data/genome/genbank/ch1.dat.gz', [25356744, 0, 25356764]], ['/home/kelvin/ssd2/test', [136152, 273530, 136351]], ['/home/kelvin/ssd2/data/genome/genbank/ch1', [0, 0, 0]]], [['/home/kelvin/ssd2/data/genome/dummy.txt', [1101, 1101, 1101]]], [['/home/kelvin/ssd2/data/genome/00-All.vcf', [32737509360, 65475018903, 32737509588]], ['/home/kelvin/ssd2/data/genome/MotifFeatures/homo_sapiens.GRCh38.motif_features.gff', [13963854962, 27927709895, 13963854962]], ['/home/kelvin/ssd2/data/genome/00-common_all.vcf', [2353901811, 4707803470, 2353901831]]]] Remember that since an operating system usually have lots of shared files (like "~/.bashrc", for example), these might be mistaken as a distributed file. Make sure to only scan folders that you store data in, or else it'll take a long time to return. :param folder: the folder to scan through :param raw: whether to return raw data or display it out nicely
Downloads a file distributedly to a specified folder. Example:: url = "https://vim.kelvinho.org" fn = "~/repos/labs/k1lib/k1lib/cli/test/applyCl.download" # file/folder name applyCl.download(url, fn) # will download distributedly and dump file fragments into the folder fn applyCl.download(url, fn, True) # same as above, but collects all fragments together, places it in fn in the current node, then deletes the temporary file fragments This only works if the server allows partial downloads btw. :param url: url to download :param folder: which folder to download parts into :param merge: whether to merge all of the fragments together into a single file in the current node or not :param timeout: timeout for each process :param chunkTimeout: timeout for each file chunk inside each process
Grabs the metadata object for the current node
Returns current node id
Returns a list of all node ids in the current cluster. Example:: applyCl.nodeIds() # returns something like ['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', '1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068'] If you want to get nodes' metadata, then just use ray's builtin function ``ray.nodes()`` :param includeSelf: whether to include node id of the current process or not
Replicates a specific file in the current node to all the other nodes. Example:: applyCl.replicateFile("~/cron.log") Internally, this will read chunks of 100kB of the specified file and dump it incrementally to all other nodes, which has implications on performance. To increase or decrease it, check out :class:`~k1lib.cli.inp.cat`. This also means you can replicate arbitrarily large files around as long as you have the disk space for it, while ram size doesn't really matter :param fn: file name
After that, fire up JupyterLab on node 1 and import k1lib as usual (star import from k1lib.imports import *
). It will be connected to the cluster automatically. Then, all the commands should work. Let's go through them one by one:
applyCl.nodeId()
'82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9'
applyCl.nodeIds()
['2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c', '901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599', '7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', '82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9']
applyCl.nodeIds(includeSelf=False)
['2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c', '901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599', '7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8']
applyCl.cpu()
16
applyCl.meta()
{'NodeID': '82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9', 'Alive': True, 'NodeManagerAddress': '192.168.1.35', 'NodeManagerHostname': 'mint-2', 'NodeManagerPort': 40137, 'ObjectManagerPort': 38439, 'ObjectStoreSocketName': '/tmp/ray/session_2023-05-24_11-25-51_818813_1910501/sockets/plasma_store', 'RayletSocketName': '/tmp/ray/session_2023-05-24_11-25-51_818813_1910501/sockets/raylet', 'MetricsExportPort': 59721, 'NodeName': '192.168.1.35', 'alive': True, 'Resources': {'node:192.168.1.35': 1.0, 'object_store_memory': 9396654489.0, 'accelerator_type:G': 1.0, 'CPU': 16.0, 'GPU': 1.0, 'memory': 18793308980.0}}
All of these are just convenience functions, but they're all derived from ray.nodes()
:
ray.nodes() | aS(json.dumps, indent=2) | aS(fmt.pre) | viz.Scroll()
[ { "NodeID": "2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c", "Alive": true, "NodeManagerAddress": "192.168.1.53", "NodeManagerHostname": "mint-5", "NodeManagerPort": 43307, "ObjectManagerPort": 39553, "ObjectStoreSocketName": "/tmp/ray/session_2023-05-24_11-25-51_818813_1910501/sockets/plasma_store", "RayletSocketName": "/tmp/ray/session_2023-05-24_11-25-51_818813_1910501/sockets/raylet", "MetricsExportPort": 59718, "NodeName": "192.168.1.53", "alive": true, "Resources": { "CPU": 8.0, "object_store_memory": 9808564224.0, "memory": 22886649856.0, "node:192.168.1.53": 1.0 } }, { "NodeID": "901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599", "Alive": true, "NodeManagerAddress": "192.168.1.43", "NodeManagerHostname": "mint-4", "NodeManagerPort": 37343, "ObjectManagerPort": 41541, "ObjectStoreSocketName": "/tmp/ray/session_2023-05-24_11-25-51_818813_1910501/sockets/plasma_store", "RayletSocketName": "/tmp/ray/session_2023-05-24_11-25-51_818813_1910501/sockets/raylet", "MetricsExportPort": 61288, "NodeName": "192.168.1.43", "alive": true, "Resources": { "CPU": 8.0, "memory": 11157889434.0, "object_store_memory": 4781952614.0, "node:192.168.1.43": 1.0 } }, { "NodeID": "7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8", "Alive": true, "NodeManagerAddress": "192.168.1.57", "NodeManagerHostname": "mint-6", "NodeManagerPort": 36229, "ObjectManagerPort": 40691, "ObjectStoreSocketName": "/tmp/ray/session_2023-05-24_11-25-51_818813_1910501/sockets/plasma_store", "RayletSocketName": "/tmp/ray/session_2023-05-24_11-25-51_818813_1910501/sockets/raylet", "MetricsExportPort": 53206, "NodeName": "192.168.1.57", "alive": true, "Resources": { "CPU": 32.0, "memory": 93009602560.0, "object_store_memory": 39861258240.0, "node:192.168.1.57": 1.0 } }, { "NodeID": "82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9", "Alive": true, "NodeManagerAddress": "192.168.1.35", "NodeManagerHostname": "mint-2", "NodeManagerPort": 40137, "ObjectManagerPort": 38439, "ObjectStoreSocketName": "/tmp/ray/session_2023-05-24_11-25-51_818813_1910501/sockets/plasma_store", "RayletSocketName": "/tmp/ray/session_2023-05-24_11-25-51_818813_1910501/sockets/raylet", "MetricsExportPort": 59721, "NodeName": "192.168.1.35", "alive": true, "Resources": { "node:192.168.1.35": 1.0, "object_store_memory": 9396654489.0, "accelerator_type:G": 1.0, "CPU": 16.0, "GPU": 1.0, "memory": 18793308980.0 } } ]
applyCl.aS?
Signature: applyCl.aS(f, timeout: float = 8, resolve: bool = True, num_cpus=1) Docstring: Executes function f once for all node ids that are piped in. Example:: # returns [['1051da...', ['Desktop', 'Downloads']], ['7bb387...', ['Pictures', 'Music']]] applyCl.nodeIds() | applyCl.aS(lambda: None | cmd("ls ~") | deref()) | deref() # also returns [['1051da...', ['Desktop', 'Downloads']], ['7bb387...', ['Pictures', 'Music']]] None | applyCl.aS(lambda: None | cmd("ls ~") | deref()) | deref() If you want to execute f for all nodes, you can pass in None instead. As a reminder, this kinda follows the same logic as the popular cli :class:`aS`, where f is executed once, hence the name "apply Single". Here, the meaning of "single" is different. It just means execute once for each node ids. :param f: main function to execute in each node. Not supposed to accept any arguments :param timeout: seconds to wait for job before raising an error :param resolve: whether to resolve the result or not. See main docs at :class:`applyCl` File: ~/anaconda3/envs/ray2/lib/python3.9/site-packages/k1lib-1.3.9.1-py3.9.egg/k1lib/cli/modifier.py Type: function
Simplest use case:
None | applyCl.aS(lambda: "returning some string value") | deref()
[['2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c', 'returning some string value'], ['901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599', 'returning some string value'], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', 'returning some string value'], ['82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9', 'returning some string value']]
Here, the function lambda: "returning some string value"
is executed once on every node, hence the name aS
, or "apply single". The meaning is different from the usual aS
though. The result is a list with 2 values: node id and result of function. Instead of passing in None (which will execute on all nodes), you can pass in a list of node ids so that it only executes on those nodes:
[
"2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c",
"901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599"
] | applyCl.aS(lambda: "returning some string value") | deref()
[['2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c', 'returning some string value'], ['901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599', 'returning some string value']]
That's pretty much it. Note that this is terrible for actual heavy processing, as only 1 process gets executed on each node, so if you have 3 nodes, each with 16 cores, then this will only run 3 processes. However, this is useful for general managing, and is a great starting point in the distributed pipeline.
applyCl.cmd?
Signature: applyCl.cmd(s: str, timeout: float = 8, sudo=False, nodeIds=None) Docstring: Convenience function to execute shell command on all nodes. Example:: applyCl.cmd("mkdir -p /some/folder") It returns [[nodeid1, output1], [nodeid2, output2]]. If you need more flexibility, fall back to :meth:`applyCl.aS` :param s: shell command to execute :param sudo: if True, will execute the command with sudo privileges. Will ask for password and then cache it internally for 5 minutes File: ~/anaconda3/envs/ray2/lib/python3.9/site-packages/k1lib-1.3.9.1-py3.9.egg/k1lib/cli/modifier.py Type: function
Sometimes, you just want to execute some shell code on all nodes. So instead of this:
None | applyCl.aS(lambda: None | cmd("uname -a") | deref()) | deref()
[['2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c', ['Linux mint-5 5.15.0-72-generic #79-Ubuntu SMP Wed Apr 19 08:22:18 UTC 2023 x86_64 x86_64 x86_64 GNU/Linux']], ['901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599', ['Linux mint-4 5.15.0-72-generic #79-Ubuntu SMP Wed Apr 19 08:22:18 UTC 2023 x86_64 x86_64 x86_64 GNU/Linux']], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', ['Linux mint-6 5.15.0-72-generic #79-Ubuntu SMP Wed Apr 19 08:22:18 UTC 2023 x86_64 x86_64 x86_64 GNU/Linux']], ['82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9', ['Linux mint-2 5.15.0-71-generic #78-Ubuntu SMP Tue Apr 18 09:00:29 UTC 2023 x86_64 x86_64 x86_64 GNU/Linux']]]
Which is rather annoying, with lots of boilerplate code, you can do this instead:
applyCl.cmd("uname -a")
[['2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c', ['Linux mint-5 5.15.0-72-generic #79-Ubuntu SMP Wed Apr 19 08:22:18 UTC 2023 x86_64 x86_64 x86_64 GNU/Linux']], ['901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599', ['Linux mint-4 5.15.0-72-generic #79-Ubuntu SMP Wed Apr 19 08:22:18 UTC 2023 x86_64 x86_64 x86_64 GNU/Linux']], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', ['Linux mint-6 5.15.0-72-generic #79-Ubuntu SMP Wed Apr 19 08:22:18 UTC 2023 x86_64 x86_64 x86_64 GNU/Linux']], ['82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9', ['Linux mint-2 5.15.0-71-generic #78-Ubuntu SMP Tue Apr 18 09:00:29 UTC 2023 x86_64 x86_64 x86_64 GNU/Linux']]]
This is pretty useful. You can now run automated upgrades, status check and much more on all nodes at once, freeing time up for you to do other things.
This is another major mode of operation other than the familiar apply()
that's trying to "preserve" the node id. I know I'm bad at naming things. Let's see an example:
[
["901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599", 56],
["2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c", 12],
["901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599", 89],
["901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599", 27],
] | applyCl(lambda x: x + 3, pre=True) | deref()
[['901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599', 59], ['2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c', 15], ['901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599', 92], ['901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599', 30]]
Just a reminder, normal mode looks like this:
[56, 12, 89, 27] | applyCl(lambda x: x + 3) | deref()
[59, 15, 92, 30]
By specifying pre=True
, you're saying that you want to execute the function using some particular parameter (56) on a particular node (867dc6c8f88045aa165af30a64125aee311f1f64a32f1c43c6ed4dc0). The answer given back will be a list of [node id, result]
. Let's consider a more detailed setup:
base = "/home/kelvin/repos/labs/mlexps/other/27-multi-node/experiments"
applyCl.cmd(f"rm -rf {base}/preserve") # wipe everything
applyCl.cmd(f"mkdir -p {base}/preserve") # create folders for distributed files
tasks = applyCl.nodeIds() | insertIdColumn() | ~apply(lambda x, y: [y]*(x+1)) | joinStreams() | insertIdColumn(begin=False) | apply((op()+3)*4, 1) | deref()
tasks
[['2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c', 12], ['901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599', 16], ['901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599', 20], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', 24], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', 28], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', 32], ['82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9', 36], ['82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9', 40], ['82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9', 44], ['82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9', 48]]
tasks | applyCl(lambda x: "file contents\nLine 1\nLine 2: " + "x"*x | file(f"{base}/preserve/{x}.txt"), pre=True) | ignore()
applyCl.cmd(f"ls {base}/preserve")
[['2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c', ['12.txt']], ['901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599', ['16.txt', '20.txt']], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', ['24.txt', '28.txt', '32.txt']], ['82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9', ['36.txt', '40.txt', '44.txt', '48.txt']]]
Here, let's assume that through some process, we're able to write the processed results to several files on several nodes. Problem is, those files don't have the same name, and there aren't the same number of files. What to do? Well, we can first list out all the files in each node:
a = None | applyCl.aS(lambda: ls(f"{base}/preserve")) | deref(); a
[['2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c', ['/home/kelvin/repos/labs/mlexps/other/27-multi-node/experiments/preserve/12.txt']], ['901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599', ['/home/kelvin/repos/labs/mlexps/other/27-multi-node/experiments/preserve/20.txt', '/home/kelvin/repos/labs/mlexps/other/27-multi-node/experiments/preserve/16.txt']], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', ['/home/kelvin/repos/labs/mlexps/other/27-multi-node/experiments/preserve/32.txt', '/home/kelvin/repos/labs/mlexps/other/27-multi-node/experiments/preserve/24.txt', '/home/kelvin/repos/labs/mlexps/other/27-multi-node/experiments/preserve/28.txt']], ['82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9', ['/home/kelvin/repos/labs/mlexps/other/27-multi-node/experiments/preserve/44.txt', '/home/kelvin/repos/labs/mlexps/other/27-multi-node/experiments/preserve/40.txt', '/home/kelvin/repos/labs/mlexps/other/27-multi-node/experiments/preserve/36.txt', '/home/kelvin/repos/labs/mlexps/other/27-multi-node/experiments/preserve/48.txt']]]
Then let's ungroup it:
b = a | ungroup() | deref(); b
[['2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c', '/home/kelvin/repos/labs/mlexps/other/27-multi-node/experiments/preserve/12.txt'], ['901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599', '/home/kelvin/repos/labs/mlexps/other/27-multi-node/experiments/preserve/20.txt'], ['901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599', '/home/kelvin/repos/labs/mlexps/other/27-multi-node/experiments/preserve/16.txt'], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', '/home/kelvin/repos/labs/mlexps/other/27-multi-node/experiments/preserve/32.txt'], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', '/home/kelvin/repos/labs/mlexps/other/27-multi-node/experiments/preserve/24.txt'], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', '/home/kelvin/repos/labs/mlexps/other/27-multi-node/experiments/preserve/28.txt'], ['82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9', '/home/kelvin/repos/labs/mlexps/other/27-multi-node/experiments/preserve/44.txt'], ['82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9', '/home/kelvin/repos/labs/mlexps/other/27-multi-node/experiments/preserve/40.txt'], ['82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9', '/home/kelvin/repos/labs/mlexps/other/27-multi-node/experiments/preserve/36.txt'], ['82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9', '/home/kelvin/repos/labs/mlexps/other/27-multi-node/experiments/preserve/48.txt']]
It is now in a very nice format, ready to be fed into applyCl
right away:
b | applyCl(cat() | join(" | "), pre=True) | deref()
[['2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c', 'file contents | Line 1 | Line 2: xxxxxxxxxxxx'], ['901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599', 'file contents | Line 1 | Line 2: xxxxxxxxxxxxxxxxxxxx'], ['901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599', 'file contents | Line 1 | Line 2: xxxxxxxxxxxxxxxx'], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', 'file contents | Line 1 | Line 2: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', 'file contents | Line 1 | Line 2: xxxxxxxxxxxxxxxxxxxxxxxx'], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', 'file contents | Line 1 | Line 2: xxxxxxxxxxxxxxxxxxxxxxxxxxxx'], ['82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9', 'file contents | Line 1 | Line 2: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'], ['82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9', 'file contents | Line 1 | Line 2: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'], ['82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9', 'file contents | Line 1 | Line 2: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'], ['82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9', 'file contents | Line 1 | Line 2: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx']]
So as you can see, this will fire up 1 process in the 1st node, 2 processes in the 2nd node, and 3 processes in the 3rd node. The processes will read the file contents, and return it to the 1st node and be displayed out. This is to show the utility of the preserve mode.
applyCl.replicateFile?
Signature: applyCl.replicateFile(fn: str, nodeIds=None) Docstring: Replicates a specific file in the current node to all the other nodes. Example:: applyCl.replicateFile("~/cron.log") Internally, this will read chunks of 100kB of the specified file and dump it incrementally to all other nodes, which has implications on performance. To increase or decrease it, check out :class:`~k1lib.cli.inp.cat`. This also means you can replicate arbitrarily large files around as long as you have the disk space for it, while ram size doesn't really matter :param fn: file name File: ~/anaconda3/envs/ray2/lib/python3.9/site-packages/k1lib-1.3.9.1-py3.9.egg/k1lib/cli/modifier.py Type: function
Let's say that you have some file on the current node that's pretty important, and you want all nodes to have a local copy of it, then you can use this function to do exactly that:
applyCl.cmd(f"rm -rf {base}/replicateFile") # wipe everything
None | cmd(f"mkdir -p {base}/replicateFile") | deref() # creates new folder only on this node
["0123456789", "abcdefghij"] | file(f"{base}/replicateFile/abc.txt") # prepare file on this node
applyCl.cmd(f"ls {base} | grep replicateFile") | cut(1) | deref() | aS(print) # list all files inside the folder for all nodes
applyCl.replicateFile(f"{base}/replicateFile/abc.txt") # replicates file to other nodes
applyCl.cmd(f"ls {base}/replicateFile") | cut(1) | deref() | aS(print) # list all files inside the folder for all nodes
[[], [], [], ['replicateFile']] (remoteF pid=647760, ip=192.168.1.57) (remoteF pid=647760, ip=192.168.1.57) Error encountered: (remoteF pid=647760, ip=192.168.1.57) (remoteF pid=647760, ip=192.168.1.57) rm: cannot remove '/home/kelvin/repos/labs/mlexps/other/27-multi-node/experiments/replicateFile/abc.txt': No such file or directory [['abc.txt'], ['abc.txt'], ['abc.txt'], ['abc.txt']]
This shows that all the files replicated just fine. Let's check the content of them:
None | applyCl.aS(lambda: cat(f"{base}/replicateFile/abc.txt") | deref()) | deref()
[['2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c', ['0123456789', 'abcdefghij']], ['901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599', ['0123456789', 'abcdefghij']], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', ['0123456789', 'abcdefghij']], ['82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9', ['0123456789', 'abcdefghij']]]
Yep. Wonderful. My particular use case for this is when I'm aligning genome sequencing reads to the reference genome, I need the reference genome to be available on all nodes, so I can quickly fire up some 3rd party tool and have them access to a fast, local file.
applyCl.balanceFile?
Signature: applyCl.balanceFile( fn: str, nAs: List[str] = None, nBs: List[str] = None, rS=None, ) Docstring: Splits a specified file in node nAs and dumps other parts to nodes nBs. Example:: applyCl.balanceFile("~/cron.log") This will split the big files up into multiple segments (1 for each node). Then for each segment, it will read through it chunk by chunk into memory, and then deposits it into the respective nodes. Finally, it truncates the original files down to its segment boundary. The main goal of this is so that you can analyze a single big (say 200GB) file quickly. If that file is on a single node, then it will take forever, even with :class:`applyMp`. So splitting things up on multiple nodes will make analyzing it a lot faster. There's also the function :meth:`balanceFolder`, which has the opposite problem of having lots of small (say 100MB) files. So it will try to move files around (keeping them intact in the meantime) to different nodes so that the folder size ratio is roughly proportional to the cpu count. The exact split rule depends on the number of CPUs of each node. Best to see an example:: Command: applyCl.balanceFile("~/cron.log") Verbose command: applyCl.balanceFile("~/cron.log", ["1"], ["1", "2", "3", "4", "5"]) ----------- Before ----------- Node: 1 2 3 4 5 Cpu: 8 12 16 8 8 Size (GB): 52 0 0 0 0 ----------- After ----------- Node: 1 2 3 4 5 Cpu: 8 12 16 8 8 Size (GB): 8 12 16 8 8 This also works if you have files on existing nodes already, and are upgrading the cluster:: Command: applyCl.balanceFile("~/cron.log") Verbose command: applyCl.balanceFile("~/cron.log", ["1", "5"], ["1", "2", "3", "4", "5"]) ----------- Before ----------- Node: 1 2 3 4 5 Cpu: 8 12 16 8 8 Size (GB): 26 0 0 26 0 ----------- After ----------- Node: 1 2 3 4 5 Cpu: 8 12 16 8 8 Size (GB): 8 12 16 8 8 If you want to move files out of a node when decommissioning them, you can do something like this:: Command: applyCl.decommission("~/cron.log", ["3", "4"]) Verbose command: applyCl.balanceFile("~/cron.log", ["1", "2", "3", "4", "5"], ["1", "2", "5"]) ----------- Before ----------- Node: 1 2 3 4 5 Cpu: 8 12 16 8 8 Size (GB): 8 12 16 8 8 ----------- After ----------- Node: 1 2 3 4 5 Cpu: 8 12 16 8 8 Size (GB): 15 22 0 0 15 Remember that the node ids "1", etc. is for illustrative purposes only. You should get real node ids from :meth:`nodeIds`. Why is the file size proportional to the number of cores on each node? Well, if you have more cores, you should be able to process more, so as to make everything balanced, right? Again, this means that you can split arbitrarily large files as long as you have the disk space for it, ram size is not a concern. How does this perform? Not the best in the world if you don't have a lot of nodes. With sata 3 ssds, 750MB/s ethernet, I got transfer speeds of roughly 100MB/s. This should increase as you have more nodes based on the code structure, but I haven't tested it yet. Can it be faster? Definitely. Am I willing to spend time optimizing it? No. :param fn: file name :param nAs: node ids that currently stores the file. If not specified, try to detect what nodes the file exists in :param nBs: node ids that will store the file after balancing everything out. If not specified, will take all available nodes :param rS: :class:`~k1lib.cli.inp.refineSeek` instance, if you need more fine-grained control over section boundaries so as to not make everything corrupted File: ~/anaconda3/envs/ray2/lib/python3.9/site-packages/k1lib-1.3.9.1-py3.9.egg/k1lib/cli/modifier.py Type: function
This one is pretty straightforward. There's an example of creating and processing distributed files above somewhere, in the programming model section.
applyCl.cat?
Signature: applyCl.cat( fn: str = None, f: Callable = None, nodeIds=None, timeout: float = 60, pre: bool = False, multiplier: int = 1, includeId: bool = False, resolve: bool = True, ) Docstring: Reads a file distributedly, does some operation on them, collects and returns all of the data together. Example:: fn = "~/repos/labs/k1lib/k1lib/cli/test/applyCl.cat.data" ("0123456789"*5 + "\n") * 1000 | file(fn) applyCl.splitFile(fn) applyCl.cat(fn, shape(0), keepNodeIds=True) | deref() That returns something like this (for a 2-node cluster, with 2 (node A) and 4 (node B) cpus respectively):: [['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', 167], ['7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d', 167], ['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 166], ['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 167], ['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 166], ['1051dafd2b0dac13561c46fe052f561400592f0723df2cd746a41068', 167]] Here, we're creating an initial file with 1000 lines. Then we'll split it up into 2 fragments: 334 lines and 667 lines and store them on the respective nodes. Then, on node A, we'll split the file up into 2 parts, each with 167 lines. On node B, we'll split the file up into 4 parts, each with around 166 lines. Then we'll schedule 6 processes total, each dealing with 166 lines. After all of that, results are collected together and returned. If you want to distinguish between different processes inside f, for example you want to write results into different files, you can do something like this:: dir_ = "~/repos/labs/k1lib/k1lib/cli/test" fn = f"{dir_}/applyCl.cat.data" applyCl.cmd(f"rm -r {dir_}/applyCl") # clear out old folders applyCl.cmd(f"mkdir -p {dir_}/applyCl") # creating folders # do processing on fn distributedly, then dump results into multiple files applyCl.cat(fn, ~aS(lambda idx, lines: lines | shape(0) | aS(dill.dumps) | file(f"{dir_}/applyCl/{idx}.pth")), includeId=True) | deref() # reading all files and summing them together None | applyCl.aS(lambda: ls(f"{dir_}/applyCl")) | ungroup() | applyCl(cat(text=False) | aS(dill.loads), pre=True) | cut(1) | toSum() .. admonition:: Simple mode There's also another mode that's activated whenever f is not specified that feels more like vanilla :class:`~inp.cat`. Say you have a file on a specific node:: nodeId = "7bb387b2920694abe9f7d2a2ed939b6d31843faf91d174d0221e871d" fn = "~/ssd2/randomFile.txt" # -------------- file is on current node -------------- cat(fn) # returns iterator of lines inside the file fn | cat() # same thing as above # -------------- file is on remote node -------------- [nodeId, fn] | applyCl.cat() # returns iterator of lines of the file applyCl.cat([nodeId, fn]) # same thing nodeId | applyCl.cat(fn) # also same thing So yeah, there're lots of ways to just simply read a file on a remote node. Is it too much? Probably, but good thing is that you can pick any that's intuitive for you. Note that this mode is just for convenience only, for when you want to do exploratory analysis on a single remote file. To be efficient at bulk processing, use the normal mode instead. :param fn: file name :param f: function to execute in every process :param nodeIds: only read file from these nodes :param timeout: kills the processes if it takes longer than this amount of seconds :param pre: "preserve" mode, just like in :class:`applyCl`. Whether to keep the node id column or not :param multiplier: by default, each node will spawn as many process as there are cpus. Sometimes you want to spawn more process, change this to a higher number :param includeId: includes a unique id for this process (just normal integers from 0 to n) :param resolve: whether to resolve the remote objects or not File: ~/anaconda3/envs/ray2/lib/python3.9/site-packages/k1lib-1.3.9.1-py3.9.egg/k1lib/cli/modifier.py Type: function
This one is also relatively straightforward. Let's run the code in the example:
dir_ = "~/repos/labs/k1lib/k1lib/cli/test"
fn = f"{dir_}/applyCl.cat.data"
applyCl.cmd(f"rm -r {dir_}/applyCl") # clear out old folders
applyCl.cmd(f"mkdir -p {dir_}/applyCl") # creating folders
# do processing on fn distributedly, then dump results into multiple files
applyCl.cat(fn, ~aS(lambda idx, lines: lines | shape(0) | aS(dill.dumps) | file(f"{dir_}/applyCl/{idx}.pth")), includeId=True) | deref()
# reading all files and summing them together
None | applyCl.aS(lambda: ls(f"{dir_}/applyCl")) | ungroup(single=True, begin=True) | applyCl(cat(text=False) | aS(dill.loads), pre=True) | cut(1) | toSum()
2060
This output is a little mysterious, so let's run the last 2 lines step by step.
Let's see one of the files:
cat(fn) | headOut(); cat(fn) | shape()
01234567890123456789012345678901234567890123456789 01234567890123456789012345678901234567890123456789 01234567890123456789012345678901234567890123456789 01234567890123456789012345678901234567890123456789 01234567890123456789012345678901234567890123456789 01234567890123456789012345678901234567890123456789 01234567890123456789012345678901234567890123456789 01234567890123456789012345678901234567890123456789 01234567890123456789012345678901234567890123456789 01234567890123456789012345678901234567890123456789
(500, 50)
Sizes of these files?
None | applyCl.aS(lambda: cat(fn) | shape()) | deref()
[['2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c', [500, 50]], ['901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599', [500, 50]], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', [500, 50]], ['82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9', [500, 50]]]
applyCl.cmd(f"ls -lah {dir_} | grep applyCl.cat.data") | cut(1) | apply(join("\n") | aS(fmt.pre)) | aS(viz.Carousel)
-rw-rw-r-- 1 kelvin kelvin 25K May 11 14:49 applyCl.cat.data
-rw-rw-r-- 1 kelvin kelvin 25K May 11 14:49 applyCl.cat.data
-rw-rw-r-- 1 kelvin kelvin 25K May 11 14:49 applyCl.cat.data
-rw-rw-r-- 1 kelvin kelvin 25K May 11 14:49 applyCl.cat.data
Let's see what applyCl.cat()
does:
applyCl.cat(fn, shape()) | batched(5, True) | deref()
[[[63, 50], [63, 50], [64, 50], [63, 50], [64, 50]], [[63, 50], [64, 50], [63, 50], [63, 50], [63, 50]], [[64, 50], [63, 50], [64, 50], [63, 50], [64, 50]], [[63, 50], [16, 50], [17, 50], [16, 50], [17, 50]], [[17, 50], [16, 50], [17, 50], [16, 50], [17, 50]], [[17, 50], [16, 50], [17, 50], [17, 50], [16, 50]], [[17, 50], [16, 50], [17, 50], [17, 50], [16, 50]], [[17, 50], [17, 50], [16, 50], [17, 50], [16, 50]], [[17, 50], [17, 50], [16, 50], [17, 50], [17, 50]], [[16, 50], [17, 50], [16, 50], [32, 50], [32, 50]], [[32, 50], [32, 50], [33, 50], [32, 50], [32, 50]], [[32, 50], [33, 50], [32, 50], [32, 50], [32, 50]], [[33, 50], [32, 50], [32, 50], [32, 50]]]
The file in the first node (250 lines) is split into 8 different sections. Then 8 processes inside that node will start executing, handling 1 section. For the second node, 500 lines is split into 16 different sections and 16 processes will start executing.
As you can see, each core for each node is handling roughly the same number of lines, which means we'll make the best use of our hardware, and that all processes should finish at roughly the same time. You can change the multiplier parameter to split the file into even smaller parts:
applyCl.cat(fn, shape(0), multiplier=2) | batched(7, True) | deref()
[[32, 32, 32, 32, 33, 32, 32], [32, 33, 32, 32, 32, 33, 32], [32, 32, 32, 32, 32, 32, 33], [32, 32, 32, 33, 32, 32, 32], [33, 32, 32, 32, 8, 9, 9], [9, 9, 8, 9, 9, 9, 9], [8, 9, 9, 9, 9, 8, 9], [9, 9, 9, 9, 8, 9, 9], [9, 9, 8, 9, 9, 9, 9], [8, 9, 9, 9, 9, 9, 8], [9, 9, 9, 9, 8, 9, 9], [9, 9, 8, 9, 9, 9, 9], [9, 8, 9, 9, 9, 9, 8], [9, 9, 9, 9, 8, 16, 17], [16, 17, 17, 16, 17, 16, 17], [17, 16, 17, 17, 16, 17, 16], [17, 17, 16, 17, 17, 16, 17], [16, 17, 17, 16, 17, 17, 16], [17, 16]]
Wonderful, isn't it? But after you've done the analysis (counting how many lines are in each segment in each file in each node), and you want to store it to disk, then how would you do it? You can make applyCl.cat
to insert a unique id for each process:
applyCl.cat(fn, ~aS(lambda idx, lines: [idx, lines | shape(0)]), includeId=True) | batched(5, True) | deref()
[[[0, 63], [1, 63], [2, 64], [3, 63], [4, 64]], [[5, 63], [6, 64], [7, 63], [8, 63], [9, 63]], [[10, 64], [11, 63], [12, 64], [13, 63], [14, 64]], [[15, 63], [16, 16], [17, 17], [18, 16], [19, 17]], [[20, 17], [21, 16], [22, 17], [23, 16], [24, 17]], [[25, 17], [26, 16], [27, 17], [28, 17], [29, 16]], [[30, 17], [31, 16], [32, 17], [33, 17], [34, 16]], [[35, 17], [36, 17], [37, 16], [38, 17], [39, 16]], [[40, 17], [41, 17], [42, 16], [43, 17], [44, 17]], [[45, 16], [46, 17], [47, 16], [48, 32], [49, 32]], [[50, 32], [51, 32], [52, 33], [53, 32], [54, 32]], [[55, 32], [56, 33], [57, 32], [58, 32], [59, 32]], [[60, 33], [61, 32], [62, 32], [63, 32]]]
Now, each process is not given an iterator of all lines, but instead is given [process id, lines]
. You can utilize that process id as the name of the file to save the analysis output into:
applyCl.cat(fn, ~aS(lambda idx, lines: lines | shape(0) | aS(dill.dumps) | file(f"{dir_}/applyCl/{idx}.pth")), includeId=True) | batched(2, True) | display(None)
/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/0.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/1.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/2.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/3.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/4.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/5.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/6.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/7.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/8.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/9.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/10.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/11.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/12.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/13.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/14.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/15.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/16.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/17.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/18.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/19.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/20.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/21.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/22.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/23.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/24.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/25.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/26.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/27.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/28.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/29.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/30.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/31.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/32.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/33.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/34.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/35.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/36.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/37.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/38.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/39.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/40.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/41.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/42.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/43.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/44.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/45.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/46.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/47.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/48.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/49.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/50.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/51.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/52.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/53.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/54.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/55.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/56.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/57.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/58.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/59.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/60.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/61.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/62.pth /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/63.pth
Wonderful. The output data files:
applyCl.cmd(f"ls -la {dir_}/applyCl") | cut(1) | apply(join("\n") | aS(fmt.pre)) | aS(viz.Carousel)
total 40 drwxrwxr-x 2 kelvin kelvin 4096 May 24 16:07 . drwxrwxr-x 8 kelvin kelvin 4096 May 24 16:07 .. -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 0.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 1.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 2.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 3.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 4.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 5.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 6.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 7.pth
total 40 drwxrwxr-x 2 kelvin kelvin 4096 May 24 16:07 . drwxrwxr-x 8 kelvin kelvin 4096 May 24 16:07 .. -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 10.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 11.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 12.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 13.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 14.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 15.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 8.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 9.pth
total 136 drwxrwxr-x 2 kelvin kelvin 4096 May 24 16:07 . drwxrwxr-x 8 kelvin kelvin 4096 May 24 16:07 .. -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 16.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 17.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 18.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 19.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 20.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 21.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 22.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 23.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 24.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 25.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 26.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 27.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 28.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 29.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 30.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 31.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 32.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 33.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 34.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 35.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 36.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 37.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 38.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 39.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 40.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 41.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 42.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 43.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 44.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 45.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 46.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 47.pth
total 72 drwxrwxr-x 2 kelvin kelvin 4096 May 24 16:07 . drwxrwxr-x 8 kelvin kelvin 4096 May 24 16:07 .. -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 48.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 49.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 50.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 51.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 52.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 53.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 54.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 55.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 56.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 57.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 58.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 59.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 60.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 61.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 62.pth -rw-rw-r-- 1 kelvin kelvin 5 May 24 16:08 63.pth
Reading all generated files:
a = None | applyCl.aS(lambda: ls(f"{dir_}/applyCl")) | deref(); a
[['2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c', ['/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/5.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/0.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/2.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/7.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/6.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/1.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/4.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/3.pth']], ['901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599', ['/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/13.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/9.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/15.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/11.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/14.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/8.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/12.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/10.pth']], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', ['/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/26.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/35.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/28.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/34.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/24.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/39.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/22.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/41.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/33.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/27.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/21.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/46.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/31.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/43.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/18.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/16.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/29.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/30.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/40.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/19.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/47.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/17.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/42.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/25.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/45.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/32.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/38.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/37.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/20.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/36.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/44.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/23.pth']], ['82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9', ['/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/54.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/60.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/62.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/51.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/61.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/48.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/59.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/57.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/53.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/55.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/52.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/56.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/49.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/50.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/63.pth', '/home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/58.pth']]]
Ungrouping them so that 1 process can access 1 file:
b = a | ungroup(single=True, begin=True) | deref()
b | display(20)
2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/5.pth 2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/0.pth 2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/2.pth 2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/7.pth 2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/6.pth 2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/1.pth 2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/4.pth 2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/3.pth 901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599 /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/13.pth 901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599 /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/9.pth 901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599 /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/15.pth 901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599 /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/11.pth 901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599 /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/14.pth 901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599 /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/8.pth 901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599 /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/12.pth 901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599 /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/10.pth 7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8 /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/26.pth 7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8 /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/35.pth 7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8 /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/28.pth 7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8 /home/kelvin/repos/labs/k1lib/k1lib/cli/test/applyCl/34.pth
This is a form suitable to give applyCl in preserve mode. Let's see the contents of 1 file first:
ls(f"{dir_}/applyCl") | item() | cat(text=False) | aS(dill.loads)
32
Just a single number. Grabbing all numbers from all files from all nodes:
c = b | applyCl(cat(text=False) | aS(dill.loads), pre=True) | deref(); c
[['2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c', 63], ['2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c', 63], ['2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c', 64], ['2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c', 63], ['2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c', 64], ['2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c', 63], ['2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c', 64], ['2fd74a5de08d96d323420c575ae514fdcb58f2987b7b621bfde3485c', 63], ['901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599', 63], ['901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599', 63], ['901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599', 63], ['901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599', 63], ['901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599', 64], ['901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599', 63], ['901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599', 64], ['901f167e6a59406e4accef23431a96ebfae6d9cd673a6c0881a92599', 64], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', 16], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', 17], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', 17], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', 16], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', 17], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', 16], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', 17], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', 17], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', 17], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', 17], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', 16], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', 17], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', 16], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', 17], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', 16], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', 16], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', 16], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', 17], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', 17], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', 17], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', 16], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', 17], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', 16], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', 17], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', 16], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', 17], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', 17], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', 16], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', 17], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', 17], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', 17], ['7c0fdef17d915b3de66341e319982d18ad990df02210502232a532c8', 16], ['82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9', 32], ['82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9', 33], ['82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9', 32], ['82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9', 32], ['82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9', 32], ['82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9', 32], ['82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9', 32], ['82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9', 32], ['82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9', 32], ['82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9', 32], ['82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9', 33], ['82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9', 33], ['82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9', 32], ['82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9', 32], ['82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9', 32], ['82db339ed40a7c0bdca829a9a50bd789fd3496dd115d12100ceda4d9', 32]]
Summing all of them together:
c | cut(1) | toSum()
2060