from k1lib.imports import *
Tries to build a complete system where I can upload whole articles and journals, "index" them using gpt embeddings, lookup relevant paragraphs, then use that as a context to answer questions. Pretty much what I've been doing for the last 3 notebooks, but build a complete and relatively useful tool so I can use in my daily life.
What's up with the Raptor rocket engine? Nothing. I just didn't have any images for the thumbnail.
# serve
base = "~/ssd/data/knowledge-base"
import openai; openai.api_key = k1.OPENAI_KEY
def complete(text, model="text-davinci-003"): return requests.post("https://api.openai.com/v1/completions", json={"prompt": text, "model": model, "max_tokens": 1000}, headers={"Authorization": f"Bearer {k1.OPENAI_KEY}"})
def _embed(text): return requests.post("https://api.openai.com/v1/embeddings", json={"input": text, "model": "text-embedding-ada-002"}, headers={"Authorization": f"Bearer {k1.OPENAI_KEY}"})
def embed(text):
attempts = 0
with k1.timer() as t:
while True:
try:
res = _embed(text); attempts += 1
if res.ok: break
except: pass
return text, res.json()["data"][0]["embedding"], attempts, t()
# serve
def collect(lines, chars=1000):
ans = []; s = ""
for line in lines:
s += f"{line}\n"
if len(s) > chars: ans.append(s); s = ""
ans.append(s); return ans
def deposit(text:str, category:str="wbw", name:str="", password:str="") -> serve.html():
if password != k1.MLEXPS_PW: return "Wrong or empty password"
category = category or "default"; name = name or f"{round(time.time())}"
os.system(f"mkdir -p {base}/cats/{category}"); fn = f"{base}/cats/{category}/{name}"
with k1.timer() as t: collect(text.split("\n")) | op().strip("\n").all() | filt(op()) | insertIdColumn() | apply(insert(name) | insert(category)) | ~apply(lambda _cat, name, idx, para: [_cat, name, idx, para, (embed(para)[1] | toTensor() | aS(np.array))]) | aS(list) | aS(dill.dumps) | file(fn)
return f"<div>Success, dumped to {fn}. Total time taken: {t()}</div>"
class cache:
def __init__(self, gen, timeout=60): self.gen = gen; self.timeout = timeout; self.last = 0
def __call__(self):
if (time.time()-self.last) > self.timeout: self.last = time.time(); self.value = self.gen()
return self.value
def loadAll(): return ls(f"{base}/cats") | apply(ls() | apply(cat(text=False) | aS(dill.loads))) | joinStreams(2) | cut(0, 1, 2, 3) & (cut(4) | toTensor().all() | aS(list) | aS(torch.stack)) | deref()
store = cache(loadAll)
# serve
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
def lookupWiki(term:str, password):
if password != k1.MLEXPS_PW: return "Wrong or empty password"
base = "~/ssd/data/knowledge-base/cats/wiki"
browser = webdriver.Chrome(); browser.get("https://www.google.com"); time.sleep(1)
try: a = browser.find_elements(by=By.CSS_SELECTOR, value="input") | filt(op().get_attribute("type") == "text") | item()
except: a = browser.find_elements(by=By.CSS_SELECTOR, value="textarea") | item()
a.send_keys(f"{term} wiki"); a.submit(); availableNames = ls(base) | op().split("/")[-1].all() | aS(set)
links = browser.find_elements(by=By.CSS_SELECTOR, value="a") | op().get_attribute("href").all() | filt(op()) | filt(op().startswith("https://en.wikipedia.org")) | op().split("#")[0].all() | aS(set) | deref(); links
names = links | op().split("/wiki/")[1].all() | deref()
if [names, availableNames] | union() | aS(len) == len(availableNames): res = "No new articles to index"
else:
links, names = [links, names] | transpose() | ~inSet(availableNames, 1) | transpose() # remove links from articles already indexed
res = f"{len(links)} total wikipedia articles to index:\n\n"
for i, (link, name) in [links, names] | transpose() | aS(enumerate):
browser.get(link); res += f"{i}. Indexing {name}... "
text = browser.find_element(by=By.CSS_SELECTOR, value="body").get_attribute("innerText")
with k1.timer() as t: deposit(text, "wiki", name, password)
res += f" Took {t():.2f} seconds\n"
browser.close(); return res.replace("\n", "<br>")
def lookupWebsite(url:str, password) -> serve.html():
if password != k1.MLEXPS_PW: return "Wrong or empty password"
base = "~/ssd/data/knowledge-base/cats"; browser = webdriver.Chrome()
category, name = url.split("://")[1].split("/") | head(1).split() | item() + join("-")
if os.path.exists(f"{base}/{category}/{name}"): return "Already indexed this page. Skipping..."
browser.get(url); text = browser.find_element(by=By.CSS_SELECTOR, value="body").get_attribute("innerText")
with k1.timer() as t: deposit(text, category, name, password)
browser.close(); return f"Article indexed in {t():.2f} seconds"
# serve
def turbo(question, context=None):
if context: ctx = f"I'm going to provide you with some context: ------ context begins ------\n\n{context}\n\n------ context finishes ------\n\n"
else: ctx = "(no context was provided, try to answer this on your own)\n\n"
return openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "system", "content": "You are a helpful assistant that reads some context information and answers the user's questions."},
{"role": "user", "content": f"{ctx}Question: {question}"},]
) | aS(json.dumps) | aS(json.loads) | op()["choices"][0]["message"]["content"]
def normal(question, model, context=None):
if context: ctx = f"You are given some context information, and after that you will be asked a question:\n\n{context}\n\n\n\n "
else: ctx = ""
return complete(f"{ctx}Question: {question}\n\n Answer:", model).text | aS(json.loads) | op()["choices"][0]["text"]
def question(question:str, category:str="", model="gpt-3.5-turbo"):
records, feats = store(); feat = embed(question)[1] | toTensor()
if len(category) > 0: records, feats = [records, feats[:,None]] | transpose() | joinStreams().all() | filt(op() == category, 0) | deref() | cut(0, 1, 2, 3) & (cut(4) | aS(list) | aS(torch.stack)) | deref()
indicess = (feats @ (embed(question)[1] | toTensor())).topk(9).indices.numpy()
c = viz.Carousel(); contexts = []; page1 = ""
for i, indices in indicess | batched(3) | aS(enumerate):
indices | apply(lambda x: records[x]) | ~apply(lambda _cat, name, idx, para: f"<h3 style='color:red'>{_cat} - {name} - #{idx}</h3><p>{para}</p>") | join("\n") | aS(contexts.append)
def genAns(context): return turbo(question, context) if model == "gpt-3.5-turbo" else normal(question, model, context)
answers = contexts | applyMp(genAns, timeout=60*5) | deref()
for i in range(3): page1 += f"<h2>Answer with context: {i*3}-{(i+1)*3}</h2>{answers[i]}";
answer2 = turbo(question) if model == "gpt-3.5-turbo" else normal(question, model)
page1 += f"<h2>Answer without context - 1</h2>{answer2}"; page1.replace("\n", "<br>") | c
context = "\n".join(contexts); f"<h2>Context</h2>{context}".replace("\n", "<br>") | c
for _cat, name, idxs in indicess | apply(lambda x: records[x]) | cut(0, 1, 2) | ~apply(lambda _cat, name, idx: [_cat, name, f"{_cat}/{name}", idx]) | groupBy(2) | apply(cut(0, 1, 3) | transpose() | item() + item() + (sort(None) | aS(set))):
paras = cat(f"{base}/cats/{_cat}/{name}", False) | aS(dill.loads) | inSet(idxs, 2).split()\
| apply(lambda x: f"<p style='font-weight: bold; color: red'>{x}</p>", 3) + apply(lambda x: f"<p>{x}</p>", 3) | joinStreams()\
| sort(2) | cut(3) | join("\n") | op().replace("\n", "<br>"); f"<h3>{_cat}/{name}</h3>{paras}" | c
return c._repr_html_()
Migrating wbw data from last notebook to this one:
#notest
base1 = "~/ssd/data/waitbutwhy"
idxs = f"{base1}/meta" | ls() | op().split("/")[-1].all() | deref()
records, feats = idxs | apply(lambda idx: cat(f"{base1}/embeddings/{idx}", False) | aS(dill.loads) | cut(0, 1) | apply(toTensor(), 1) | insert(idx).all()) | joinStreams() | deref() | (cut(0, 1) | deref() | aS(k1.Wrapper)) & (cut(2) | aS(list) | aS(torch.stack))
[records(), feats[:,None]] | transpose() | joinStreams().all() | groupBy(0) | applyMp(item(2) & (insertIdColumn(True) | ~apply(lambda idx, name, para, feat: ["wbw", name, idx, para, np.array(feat)]) | deref()) | ~aS(lambda name, data: data | aS(dill.dumps) | file(f"{base}/cats/wbw/{name}")), prefetch=16) | ignore()
res = question("what is the panic monster?", "wbw")
res | aS(IPython.display.HTML)
res = question("Why is metal toxic?")
res | aS(IPython.display.HTML)
res = question("Why is metal toxic?", model="text-davinci-003")
res | aS(IPython.display.HTML)
Quite nice. Let's finish things up.
# serve
def status() -> serve.html():
try:
a = store()[0] | groupBy(0) | apply(groupBy(1) | sort(2).all()) | deref() | aS(k1.Wrapper)
fig, axes = plt.subplots(2, 2, figsize=(10, 8)); axes = axes.flatten()
plt.sca(axes[0]); a() | apply(item(3) & shape(0)) | transpose() | deref() | ~aS(plt.bar)
plt.xlabel("Category"); plt.ylabel("Articles"); plt.xticks(rotation=75)
plt.sca(axes[1]); a() | shape(0).all(2) | joinStreams() | deref() | aS(plt.hist, bins=30);
plt.xlabel("#paragraphs/article"); plt.ylabel("Frequency");
plt.sca(axes[2]); a() | (cut(3) | shape(0).all()).all(2) | joinStreams(2) | deref() | aS(plt.hist, bins=30);
plt.xlabel("#characters/paragraph"); plt.ylabel("Frequency");
plt.sca(axes[3]); a() | (cut(3) | shape(0).all() | toSum()).all(2) | joinStreams() | deref() | aS(plt.hist, bins=30);
plt.xlabel("#characters/article"); plt.ylabel("Frequency"); plt.tight_layout(); return viz.Carousel([plt.gcf() | toImg()])._repr_html_()
except Exception as e:
with k1.captureStdout(False) as out: traceback.print_exc()
return f"<pre style='color: red'>{out()}</pre>"
# serve
def endpoint(mode:["question", "status", "deposit", "lookupWiki", "lookupWebsite"]="question", content:serve.text()="What is the panic monster?", category:str="", name:str="",
model:["gpt-3.5-turbo", "text-davinci-003", "text-curie-001", "text-babbage-001", "text-ada-001"]="gpt-3.5-turbo", password:serve.text(password=True)="") -> serve.html():
"""Tool to quickly deposit articles and ask questions about all stored articles, powered by large language models.
There are several modes:
<h3>"question" mode</h3>
You can ask the system any question you'd like. It'll search the knowledge base for relevant
information and use that to answer your question in natural language. Active parameters:
<ul>
<li>content: put your question here</li>
<li>category: if specified, then searches in the specified category only, else searches in all categories</li>
<li>model: the model that you want answer the question with</li>
</ul>
<h3>"status" mode</h3>
Gets general statistics on the knowledge base. No parameters are active.
<h3>"deposit" mode</h3>
Here, you can deposit articles into the system. It will take it in, splits up into multiple
sections, indexes and saves them to disk.
<ul>
<li>content: put your article's contents here. Just copy-paste it in</li>
<li>category: if specified, then dumps the article in a specific category, else dumps it in the default category</li>
<li>name: if specified, then use this short name to identify the article, else it uses the current unix time</li>
<li>password: the password necessary to deposit the information into</li>
</ul>
So, this isn't really free for the general public to deposit articles into. First and foremost,
it's a tool for myself. If you want a similar system, all the code is available for you to set
it up yourself, check out the source code <a href="/lm/13-knowledge-base">here</a>.
<h3>"lookupWiki" mode</h3>
In this mode, you can enter a website url in `content` parameter, like "RS-25 engine", and it will
automatically fire up chrome, look up on google for the term, grab all urls from wikipedia,
and finally index them.
<ul>
<li>content: put your search term here</li>
<li>password: the password necessary to look the term up</li>
</ul>
<h3>"lookupWebsite" mode</h3>
In this mode, you can enter a search term in `content` parameter, like "https://www.nature.com/articles/s41586-020-2528-x",
and it will automatically fire up chrome, go to the website, grab the article then index it.
<ul>
<li>content: put your website url here</li>
<li>password: the password necessary to look the term up</li>
</ul>"""
try:
if mode == "question": return question(content, category, model)
if mode == "status": return status()
if mode == "deposit": return deposit(content, category, name, password)
if mode == "lookupWiki": return lookupWiki(content, password)
if mode == "lookupWebsite": return lookupWebsite(content, password)
except Exception as e:
with k1.captureStdout(False) as out: traceback.print_exc()
out = out() | join('\n')
return f"<pre style='color: red'>{out}</pre>"
Because I don't really have any image for the thumbnail, I'm just going to use the raptor engine:
# thumbnail
"raptor.png" | toImg()
You know what, let's call this tool "Raptor", it sounds cute.