You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

406 lines
14 KiB

11 months ago
  1. const { QdrantClient } = require("@qdrant/js-client-rest");
  2. const { TextSplitter } = require("../../TextSplitter");
  3. const { SystemSettings } = require("../../../models/systemSettings");
  4. const { storeVectorResult, cachedVectorInformation } = require("../../files");
  5. const { v4: uuidv4 } = require("uuid");
  6. const { toChunks, getEmbeddingEngineSelection } = require("../../helpers");
  7. const { sourceIdentifier } = require("../../chats");
  8. const QDrant = {
  9. name: "QDrant",
  10. connect: async function () {
  11. if (process.env.VECTOR_DB !== "qdrant")
  12. throw new Error("QDrant::Invalid ENV settings");
  13. const client = new QdrantClient({
  14. url: process.env.QDRANT_ENDPOINT,
  15. ...(process.env.QDRANT_API_KEY
  16. ? { apiKey: process.env.QDRANT_API_KEY }
  17. : {}),
  18. });
  19. const isAlive = (await client.api("cluster")?.clusterStatus())?.ok || false;
  20. if (!isAlive)
  21. throw new Error(
  22. "QDrant::Invalid Heartbeat received - is the instance online?"
  23. );
  24. return { client };
  25. },
  26. heartbeat: async function () {
  27. await this.connect();
  28. return { heartbeat: Number(new Date()) };
  29. },
  30. totalVectors: async function () {
  31. const { client } = await this.connect();
  32. const { collections } = await client.getCollections();
  33. var totalVectors = 0;
  34. for (const collection of collections) {
  35. if (!collection || !collection.name) continue;
  36. totalVectors +=
  37. (await this.namespace(client, collection.name))?.vectorCount || 0;
  38. }
  39. return totalVectors;
  40. },
  41. namespaceCount: async function (_namespace = null) {
  42. const { client } = await this.connect();
  43. const namespace = await this.namespace(client, _namespace);
  44. return namespace?.vectorCount || 0;
  45. },
  46. similarityResponse: async function ({
  47. client,
  48. namespace,
  49. queryVector,
  50. similarityThreshold = 0.25,
  51. topN = 4,
  52. filterIdentifiers = [],
  53. }) {
  54. const result = {
  55. contextTexts: [],
  56. sourceDocuments: [],
  57. scores: [],
  58. };
  59. const responses = await client.search(namespace, {
  60. vector: queryVector,
  61. limit: topN,
  62. with_payload: true,
  63. });
  64. responses.forEach((response) => {
  65. if (response.score < similarityThreshold) return;
  66. if (filterIdentifiers.includes(sourceIdentifier(response?.payload))) {
  67. console.log(
  68. "QDrant: A source was filtered from context as it's parent document is pinned."
  69. );
  70. return;
  71. }
  72. result.contextTexts.push(response?.payload?.text || "");
  73. result.sourceDocuments.push({
  74. ...(response?.payload || {}),
  75. id: response.id,
  76. });
  77. result.scores.push(response.score);
  78. });
  79. return result;
  80. },
  81. namespace: async function (client, namespace = null) {
  82. if (!namespace) throw new Error("No namespace value provided.");
  83. const collection = await client.getCollection(namespace).catch(() => null);
  84. if (!collection) return null;
  85. return {
  86. name: namespace,
  87. ...collection,
  88. vectorCount: (await client.count(namespace, { exact: true })).count,
  89. };
  90. },
  91. hasNamespace: async function (namespace = null) {
  92. if (!namespace) return false;
  93. const { client } = await this.connect();
  94. return await this.namespaceExists(client, namespace);
  95. },
  96. namespaceExists: async function (client, namespace = null) {
  97. if (!namespace) throw new Error("No namespace value provided.");
  98. const collection = await client.getCollection(namespace).catch((e) => {
  99. console.error("QDrant::namespaceExists", e.message);
  100. return null;
  101. });
  102. return !!collection;
  103. },
  104. deleteVectorsInNamespace: async function (client, namespace = null) {
  105. await client.deleteCollection(namespace);
  106. return true;
  107. },
  108. // QDrant requires a dimension aspect for collection creation
  109. // we pass this in from the first chunk to infer the dimensions like other
  110. // providers do.
  111. getOrCreateCollection: async function (client, namespace, dimensions = null) {
  112. if (await this.namespaceExists(client, namespace)) {
  113. return await client.getCollection(namespace);
  114. }
  115. if (!dimensions)
  116. throw new Error(
  117. `Qdrant:getOrCreateCollection Unable to infer vector dimension from input. Open an issue on GitHub for support.`
  118. );
  119. await client.createCollection(namespace, {
  120. vectors: {
  121. size: dimensions,
  122. distance: "Cosine",
  123. },
  124. });
  125. return await client.getCollection(namespace);
  126. },
  127. addDocumentToNamespace: async function (
  128. namespace,
  129. documentData = {},
  130. fullFilePath = null,
  131. skipCache = false
  132. ) {
  133. const { DocumentVectors } = require("../../../models/vectors");
  134. try {
  135. let vectorDimension = null;
  136. const { pageContent, docId, ...metadata } = documentData;
  137. if (!pageContent || pageContent.length == 0) return false;
  138. console.log("Adding new vectorized document into namespace", namespace);
  139. if (skipCache) {
  140. const cacheResult = await cachedVectorInformation(fullFilePath);
  141. if (cacheResult.exists) {
  142. const { client } = await this.connect();
  143. const { chunks } = cacheResult;
  144. const documentVectors = [];
  145. vectorDimension =
  146. chunks[0][0]?.vector?.length ??
  147. chunks[0][0]?.values?.length ??
  148. null;
  149. const collection = await this.getOrCreateCollection(
  150. client,
  151. namespace,
  152. vectorDimension
  153. );
  154. if (!collection)
  155. throw new Error("Failed to create new QDrant collection!", {
  156. namespace,
  157. });
  158. for (const chunk of chunks) {
  159. const submission = {
  160. ids: [],
  161. vectors: [],
  162. payloads: [],
  163. };
  164. // Before sending to Qdrant and saving the records to our db
  165. // we need to assign the id of each chunk that is stored in the cached file.
  166. // The id property must be defined or else it will be unable to be managed by ALLM.
  167. chunk.forEach((chunk) => {
  168. const id = uuidv4();
  169. if (chunk?.payload?.hasOwnProperty("id")) {
  170. const { id: _id, ...payload } = chunk.payload;
  171. documentVectors.push({ docId, vectorId: id });
  172. submission.ids.push(id);
  173. submission.vectors.push(chunk.vector);
  174. submission.payloads.push(payload);
  175. } else {
  176. console.error(
  177. "The 'id' property is not defined in chunk.payload - it will be omitted from being inserted in QDrant collection."
  178. );
  179. }
  180. });
  181. const additionResult = await client.upsert(namespace, {
  182. wait: true,
  183. batch: { ...submission },
  184. });
  185. if (additionResult?.status !== "completed")
  186. throw new Error("Error embedding into QDrant", additionResult);
  187. }
  188. await DocumentVectors.bulkInsert(documentVectors);
  189. return { vectorized: true, error: null };
  190. }
  191. }
  192. // If we are here then we are going to embed and store a novel document.
  193. // We have to do this manually as opposed to using LangChains `Qdrant.fromDocuments`
  194. // because we then cannot atomically control our namespace to granularly find/remove documents
  195. // from vectordb.
  196. const EmbedderEngine = getEmbeddingEngineSelection();
  197. const textSplitter = new TextSplitter({
  198. chunkSize: TextSplitter.determineMaxChunkSize(
  199. await SystemSettings.getValueOrFallback({
  200. label: "text_splitter_chunk_size",
  201. }),
  202. EmbedderEngine?.embeddingMaxChunkLength
  203. ),
  204. chunkOverlap: await SystemSettings.getValueOrFallback(
  205. { label: "text_splitter_chunk_overlap" },
  206. 20
  207. ),
  208. chunkHeaderMeta: TextSplitter.buildHeaderMeta(metadata),
  209. });
  210. const textChunks = await textSplitter.splitText(pageContent);
  211. console.log("Chunks created from document:", textChunks.length);
  212. const documentVectors = [];
  213. const vectors = [];
  214. const vectorValues = await EmbedderEngine.embedChunks(textChunks);
  215. const submission = {
  216. ids: [],
  217. vectors: [],
  218. payloads: [],
  219. };
  220. if (!!vectorValues && vectorValues.length > 0) {
  221. for (const [i, vector] of vectorValues.entries()) {
  222. if (!vectorDimension) vectorDimension = vector.length;
  223. const vectorRecord = {
  224. id: uuidv4(),
  225. vector: vector,
  226. // [DO NOT REMOVE]
  227. // LangChain will be unable to find your text if you embed manually and dont include the `text` key.
  228. // https://github.com/hwchase17/langchainjs/blob/2def486af734c0ca87285a48f1a04c057ab74bdf/langchain/src/vectorstores/pinecone.ts#L64
  229. payload: { ...metadata, text: textChunks[i] },
  230. };
  231. submission.ids.push(vectorRecord.id);
  232. submission.vectors.push(vectorRecord.vector);
  233. submission.payloads.push(vectorRecord.payload);
  234. vectors.push(vectorRecord);
  235. documentVectors.push({ docId, vectorId: vectorRecord.id });
  236. }
  237. } else {
  238. throw new Error(
  239. "Could not embed document chunks! This document will not be recorded."
  240. );
  241. }
  242. const { client } = await this.connect();
  243. const collection = await this.getOrCreateCollection(
  244. client,
  245. namespace,
  246. vectorDimension
  247. );
  248. if (!collection)
  249. throw new Error("Failed to create new QDrant collection!", {
  250. namespace,
  251. });
  252. if (vectors.length > 0) {
  253. const chunks = [];
  254. console.log("Inserting vectorized chunks into QDrant collection.");
  255. for (const chunk of toChunks(vectors, 500)) chunks.push(chunk);
  256. const additionResult = await client.upsert(namespace, {
  257. wait: true,
  258. batch: {
  259. ids: submission.ids,
  260. vectors: submission.vectors,
  261. payloads: submission.payloads,
  262. },
  263. });
  264. if (additionResult?.status !== "completed")
  265. throw new Error("Error embedding into QDrant", additionResult);
  266. await storeVectorResult(chunks, fullFilePath);
  267. }
  268. await DocumentVectors.bulkInsert(documentVectors);
  269. return { vectorized: true, error: null };
  270. } catch (e) {
  271. console.error("addDocumentToNamespace", e.message);
  272. return { vectorized: false, error: e.message };
  273. }
  274. },
  275. deleteDocumentFromNamespace: async function (namespace, docId) {
  276. const { DocumentVectors } = require("../../../models/vectors");
  277. const { client } = await this.connect();
  278. if (!(await this.namespaceExists(client, namespace))) return;
  279. const knownDocuments = await DocumentVectors.where({ docId });
  280. if (knownDocuments.length === 0) return;
  281. const vectorIds = knownDocuments.map((doc) => doc.vectorId);
  282. await client.delete(namespace, {
  283. wait: true,
  284. points: vectorIds,
  285. });
  286. const indexes = knownDocuments.map((doc) => doc.id);
  287. await DocumentVectors.deleteIds(indexes);
  288. return true;
  289. },
  290. performSimilaritySearch: async function ({
  291. namespace = null,
  292. input = "",
  293. LLMConnector = null,
  294. similarityThreshold = 0.25,
  295. topN = 4,
  296. filterIdentifiers = [],
  297. }) {
  298. if (!namespace || !input || !LLMConnector)
  299. throw new Error("Invalid request to performSimilaritySearch.");
  300. const { client } = await this.connect();
  301. if (!(await this.namespaceExists(client, namespace))) {
  302. return {
  303. contextTexts: [],
  304. sources: [],
  305. message: "Invalid query - no documents found for workspace!",
  306. };
  307. }
  308. const queryVector = await LLMConnector.embedTextInput(input);
  309. const { contextTexts, sourceDocuments } = await this.similarityResponse({
  310. client,
  311. namespace,
  312. queryVector,
  313. similarityThreshold,
  314. topN,
  315. filterIdentifiers,
  316. });
  317. const sources = sourceDocuments.map((metadata, i) => {
  318. return { ...metadata, text: contextTexts[i] };
  319. });
  320. return {
  321. contextTexts,
  322. sources: this.curateSources(sources),
  323. message: false,
  324. };
  325. },
  326. "namespace-stats": async function (reqBody = {}) {
  327. const { namespace = null } = reqBody;
  328. if (!namespace) throw new Error("namespace required");
  329. const { client } = await this.connect();
  330. if (!(await this.namespaceExists(client, namespace)))
  331. throw new Error("Namespace by that name does not exist.");
  332. const stats = await this.namespace(client, namespace);
  333. return stats
  334. ? stats
  335. : { message: "No stats were able to be fetched from DB for namespace" };
  336. },
  337. "delete-namespace": async function (reqBody = {}) {
  338. const { namespace = null } = reqBody;
  339. const { client } = await this.connect();
  340. if (!(await this.namespaceExists(client, namespace)))
  341. throw new Error("Namespace by that name does not exist.");
  342. const details = await this.namespace(client, namespace);
  343. await this.deleteVectorsInNamespace(client, namespace);
  344. return {
  345. message: `Namespace ${namespace} was deleted along with ${details?.vectorCount} vectors.`,
  346. };
  347. },
  348. reset: async function () {
  349. const { client } = await this.connect();
  350. const response = await client.getCollections();
  351. for (const collection of response.collections) {
  352. await client.deleteCollection(collection.name);
  353. }
  354. return { reset: true };
  355. },
  356. curateSources: function (sources = []) {
  357. const documents = [];
  358. for (const source of sources) {
  359. if (Object.keys(source).length > 0) {
  360. const metadata = source.hasOwnProperty("metadata")
  361. ? source.metadata
  362. : source;
  363. documents.push({
  364. ...metadata,
  365. });
  366. }
  367. }
  368. return documents;
  369. },
  370. };
  371. module.exports.QDrant = QDrant;