You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

408 lines
14 KiB

11 months ago
  1. const {
  2. DataType,
  3. MetricType,
  4. IndexType,
  5. MilvusClient,
  6. } = require("@zilliz/milvus2-sdk-node");
  7. const { TextSplitter } = require("../../TextSplitter");
  8. const { SystemSettings } = require("../../../models/systemSettings");
  9. const { v4: uuidv4 } = require("uuid");
  10. const { storeVectorResult, cachedVectorInformation } = require("../../files");
  11. const { toChunks, getEmbeddingEngineSelection } = require("../../helpers");
  12. const { sourceIdentifier } = require("../../chats");
  13. const Milvus = {
  14. name: "Milvus",
  15. // Milvus/Zilliz only allows letters, numbers, and underscores in collection names
  16. // so we need to enforce that by re-normalizing the names when communicating with
  17. // the DB.
  18. // If the first char of the collection is not an underscore or letter the collection name will be invalid.
  19. normalize: function (inputString) {
  20. let normalized = inputString.replace(/[^a-zA-Z0-9_]/g, "_");
  21. if (new RegExp(/^[a-zA-Z_]/).test(normalized.slice(0, 1)))
  22. normalized = `anythingllm_${normalized}`;
  23. return normalized;
  24. },
  25. connect: async function () {
  26. if (process.env.VECTOR_DB !== "milvus")
  27. throw new Error("Milvus::Invalid ENV settings");
  28. const client = new MilvusClient({
  29. address: process.env.MILVUS_ADDRESS,
  30. username: process.env.MILVUS_USERNAME,
  31. password: process.env.MILVUS_PASSWORD,
  32. });
  33. const { isHealthy } = await client.checkHealth();
  34. if (!isHealthy)
  35. throw new Error(
  36. "MilvusDB::Invalid Heartbeat received - is the instance online?"
  37. );
  38. return { client };
  39. },
  40. heartbeat: async function () {
  41. await this.connect();
  42. return { heartbeat: Number(new Date()) };
  43. },
  44. totalVectors: async function () {
  45. const { client } = await this.connect();
  46. const { collection_names } = await client.listCollections();
  47. const total = collection_names.reduce(async (acc, collection_name) => {
  48. const statistics = await client.getCollectionStatistics({
  49. collection_name: this.normalize(collection_name),
  50. });
  51. return Number(acc) + Number(statistics?.data?.row_count ?? 0);
  52. }, 0);
  53. return total;
  54. },
  55. namespaceCount: async function (_namespace = null) {
  56. const { client } = await this.connect();
  57. const statistics = await client.getCollectionStatistics({
  58. collection_name: this.normalize(_namespace),
  59. });
  60. return Number(statistics?.data?.row_count ?? 0);
  61. },
  62. namespace: async function (client, namespace = null) {
  63. if (!namespace) throw new Error("No namespace value provided.");
  64. const collection = await client
  65. .getCollectionStatistics({ collection_name: this.normalize(namespace) })
  66. .catch(() => null);
  67. return collection;
  68. },
  69. hasNamespace: async function (namespace = null) {
  70. if (!namespace) return false;
  71. const { client } = await this.connect();
  72. return await this.namespaceExists(client, namespace);
  73. },
  74. namespaceExists: async function (client, namespace = null) {
  75. if (!namespace) throw new Error("No namespace value provided.");
  76. const { value } = await client
  77. .hasCollection({ collection_name: this.normalize(namespace) })
  78. .catch((e) => {
  79. console.error("MilvusDB::namespaceExists", e.message);
  80. return { value: false };
  81. });
  82. return value;
  83. },
  84. deleteVectorsInNamespace: async function (client, namespace = null) {
  85. await client.dropCollection({ collection_name: this.normalize(namespace) });
  86. return true;
  87. },
  88. // Milvus requires a dimension aspect for collection creation
  89. // we pass this in from the first chunk to infer the dimensions like other
  90. // providers do.
  91. getOrCreateCollection: async function (client, namespace, dimensions = null) {
  92. const isExists = await this.namespaceExists(client, namespace);
  93. if (!isExists) {
  94. if (!dimensions)
  95. throw new Error(
  96. `Milvus:getOrCreateCollection Unable to infer vector dimension from input. Open an issue on GitHub for support.`
  97. );
  98. await client.createCollection({
  99. collection_name: this.normalize(namespace),
  100. fields: [
  101. {
  102. name: "id",
  103. description: "id",
  104. data_type: DataType.VarChar,
  105. max_length: 255,
  106. is_primary_key: true,
  107. },
  108. {
  109. name: "vector",
  110. description: "vector",
  111. data_type: DataType.FloatVector,
  112. dim: dimensions,
  113. },
  114. {
  115. name: "metadata",
  116. decription: "metadata",
  117. data_type: DataType.JSON,
  118. },
  119. ],
  120. });
  121. await client.createIndex({
  122. collection_name: this.normalize(namespace),
  123. field_name: "vector",
  124. index_type: IndexType.AUTOINDEX,
  125. metric_type: MetricType.COSINE,
  126. });
  127. await client.loadCollectionSync({
  128. collection_name: this.normalize(namespace),
  129. });
  130. }
  131. },
  132. addDocumentToNamespace: async function (
  133. namespace,
  134. documentData = {},
  135. fullFilePath = null,
  136. skipCache = false
  137. ) {
  138. const { DocumentVectors } = require("../../../models/vectors");
  139. try {
  140. let vectorDimension = null;
  141. const { pageContent, docId, ...metadata } = documentData;
  142. if (!pageContent || pageContent.length == 0) return false;
  143. console.log("Adding new vectorized document into namespace", namespace);
  144. if (skipCache) {
  145. const cacheResult = await cachedVectorInformation(fullFilePath);
  146. if (cacheResult.exists) {
  147. const { client } = await this.connect();
  148. const { chunks } = cacheResult;
  149. const documentVectors = [];
  150. vectorDimension = chunks[0][0].values.length || null;
  151. await this.getOrCreateCollection(client, namespace, vectorDimension);
  152. try {
  153. for (const chunk of chunks) {
  154. // Before sending to Milvus and saving the records to our db
  155. // we need to assign the id of each chunk that is stored in the cached file.
  156. const newChunks = chunk.map((chunk) => {
  157. const id = uuidv4();
  158. documentVectors.push({ docId, vectorId: id });
  159. return { id, vector: chunk.values, metadata: chunk.metadata };
  160. });
  161. const insertResult = await client.insert({
  162. collection_name: this.normalize(namespace),
  163. data: newChunks,
  164. });
  165. if (insertResult?.status.error_code !== "Success") {
  166. throw new Error(
  167. `Error embedding into Milvus! Reason:${insertResult?.status.reason}`
  168. );
  169. }
  170. }
  171. await DocumentVectors.bulkInsert(documentVectors);
  172. await client.flushSync({
  173. collection_names: [this.normalize(namespace)],
  174. });
  175. return { vectorized: true, error: null };
  176. } catch (insertError) {
  177. console.error(
  178. "Error inserting cached chunks:",
  179. insertError.message
  180. );
  181. return { vectorized: false, error: insertError.message };
  182. }
  183. }
  184. }
  185. const EmbedderEngine = getEmbeddingEngineSelection();
  186. const textSplitter = new TextSplitter({
  187. chunkSize: TextSplitter.determineMaxChunkSize(
  188. await SystemSettings.getValueOrFallback({
  189. label: "text_splitter_chunk_size",
  190. }),
  191. EmbedderEngine?.embeddingMaxChunkLength
  192. ),
  193. chunkOverlap: await SystemSettings.getValueOrFallback(
  194. { label: "text_splitter_chunk_overlap" },
  195. 20
  196. ),
  197. chunkHeaderMeta: TextSplitter.buildHeaderMeta(metadata),
  198. });
  199. const textChunks = await textSplitter.splitText(pageContent);
  200. console.log("Chunks created from document:", textChunks.length);
  201. const documentVectors = [];
  202. const vectors = [];
  203. const vectorValues = await EmbedderEngine.embedChunks(textChunks);
  204. if (!!vectorValues && vectorValues.length > 0) {
  205. for (const [i, vector] of vectorValues.entries()) {
  206. if (!vectorDimension) vectorDimension = vector.length;
  207. const vectorRecord = {
  208. id: uuidv4(),
  209. values: vector,
  210. // [DO NOT REMOVE]
  211. // LangChain will be unable to find your text if you embed manually and dont include the `text` key.
  212. metadata: { ...metadata, text: textChunks[i] },
  213. };
  214. vectors.push(vectorRecord);
  215. documentVectors.push({ docId, vectorId: vectorRecord.id });
  216. }
  217. } else {
  218. throw new Error(
  219. "Could not embed document chunks! This document will not be recorded."
  220. );
  221. }
  222. if (vectors.length > 0) {
  223. const chunks = [];
  224. const { client } = await this.connect();
  225. await this.getOrCreateCollection(client, namespace, vectorDimension);
  226. console.log("Inserting vectorized chunks into Milvus.");
  227. for (const chunk of toChunks(vectors, 100)) {
  228. chunks.push(chunk);
  229. const insertResult = await client.insert({
  230. collection_name: this.normalize(namespace),
  231. data: chunk.map((item) => ({
  232. id: item.id,
  233. vector: item.values,
  234. metadata: item.metadata,
  235. })),
  236. });
  237. if (insertResult?.status.error_code !== "Success") {
  238. throw new Error(
  239. `Error embedding into Milvus! Reason:${insertResult?.status.reason}`
  240. );
  241. }
  242. }
  243. await storeVectorResult(chunks, fullFilePath);
  244. await client.flushSync({
  245. collection_names: [this.normalize(namespace)],
  246. });
  247. }
  248. await DocumentVectors.bulkInsert(documentVectors);
  249. return { vectorized: true, error: null };
  250. } catch (e) {
  251. console.error("addDocumentToNamespace", e.message);
  252. return { vectorized: false, error: e.message };
  253. }
  254. },
  255. deleteDocumentFromNamespace: async function (namespace, docId) {
  256. const { DocumentVectors } = require("../../../models/vectors");
  257. const { client } = await this.connect();
  258. if (!(await this.namespaceExists(client, namespace))) return;
  259. const knownDocuments = await DocumentVectors.where({ docId });
  260. if (knownDocuments.length === 0) return;
  261. const vectorIds = knownDocuments.map((doc) => doc.vectorId);
  262. const queryIn = vectorIds.map((v) => `'${v}'`).join(",");
  263. await client.deleteEntities({
  264. collection_name: this.normalize(namespace),
  265. expr: `id in [${queryIn}]`,
  266. });
  267. const indexes = knownDocuments.map((doc) => doc.id);
  268. await DocumentVectors.deleteIds(indexes);
  269. // Even after flushing Milvus can take some time to re-calc the count
  270. // so all we can hope to do is flushSync so that the count can be correct
  271. // on a later call.
  272. await client.flushSync({ collection_names: [this.normalize(namespace)] });
  273. return true;
  274. },
  275. performSimilaritySearch: async function ({
  276. namespace = null,
  277. input = "",
  278. LLMConnector = null,
  279. similarityThreshold = 0.25,
  280. topN = 4,
  281. filterIdentifiers = [],
  282. }) {
  283. if (!namespace || !input || !LLMConnector)
  284. throw new Error("Invalid request to performSimilaritySearch.");
  285. const { client } = await this.connect();
  286. if (!(await this.namespaceExists(client, namespace))) {
  287. return {
  288. contextTexts: [],
  289. sources: [],
  290. message: "Invalid query - no documents found for workspace!",
  291. };
  292. }
  293. const queryVector = await LLMConnector.embedTextInput(input);
  294. const { contextTexts, sourceDocuments } = await this.similarityResponse({
  295. client,
  296. namespace,
  297. queryVector,
  298. similarityThreshold,
  299. topN,
  300. filterIdentifiers,
  301. });
  302. const sources = sourceDocuments.map((metadata, i) => {
  303. return { ...metadata, text: contextTexts[i] };
  304. });
  305. return {
  306. contextTexts,
  307. sources: this.curateSources(sources),
  308. message: false,
  309. };
  310. },
  311. similarityResponse: async function ({
  312. client,
  313. namespace,
  314. queryVector,
  315. similarityThreshold = 0.25,
  316. topN = 4,
  317. filterIdentifiers = [],
  318. }) {
  319. const result = {
  320. contextTexts: [],
  321. sourceDocuments: [],
  322. scores: [],
  323. };
  324. const response = await client.search({
  325. collection_name: this.normalize(namespace),
  326. vectors: queryVector,
  327. limit: topN,
  328. });
  329. response.results.forEach((match) => {
  330. if (match.score < similarityThreshold) return;
  331. if (filterIdentifiers.includes(sourceIdentifier(match.metadata))) {
  332. console.log(
  333. "Milvus: A source was filtered from context as it's parent document is pinned."
  334. );
  335. return;
  336. }
  337. result.contextTexts.push(match.metadata.text);
  338. result.sourceDocuments.push(match);
  339. result.scores.push(match.score);
  340. });
  341. return result;
  342. },
  343. "namespace-stats": async function (reqBody = {}) {
  344. const { namespace = null } = reqBody;
  345. if (!namespace) throw new Error("namespace required");
  346. const { client } = await this.connect();
  347. if (!(await this.namespaceExists(client, namespace)))
  348. throw new Error("Namespace by that name does not exist.");
  349. const stats = await this.namespace(client, namespace);
  350. return stats
  351. ? stats
  352. : { message: "No stats were able to be fetched from DB for namespace" };
  353. },
  354. "delete-namespace": async function (reqBody = {}) {
  355. const { namespace = null } = reqBody;
  356. const { client } = await this.connect();
  357. if (!(await this.namespaceExists(client, namespace)))
  358. throw new Error("Namespace by that name does not exist.");
  359. const statistics = await this.namespace(client, namespace);
  360. await this.deleteVectorsInNamespace(client, namespace);
  361. const vectorCount = Number(statistics?.data?.row_count ?? 0);
  362. return {
  363. message: `Namespace ${namespace} was deleted along with ${vectorCount} vectors.`,
  364. };
  365. },
  366. curateSources: function (sources = []) {
  367. const documents = [];
  368. for (const source of sources) {
  369. const { metadata = {} } = source;
  370. if (Object.keys(metadata).length > 0) {
  371. documents.push({
  372. ...metadata,
  373. ...(source.hasOwnProperty("pageContent")
  374. ? { text: source.pageContent }
  375. : {}),
  376. });
  377. }
  378. }
  379. return documents;
  380. },
  381. };
  382. module.exports.Milvus = Milvus;