Temporary repo to track my changes on LTS functions app porting
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

252 lines
10 KiB

  1. using System;
  2. using System.Text.Json;
  3. using System.Threading.Tasks;
  4. using Azure.Storage.Blobs;
  5. using Azure.Storage.Blobs.Models;
  6. using Azure.Messaging.ServiceBus;
  7. using Microsoft.Azure.Functions.Worker;
  8. using Microsoft.Extensions.Logging;
  9. using Google.Protobuf.Reflection;
  10. namespace CDP
  11. {
  12. public class EventProcessor
  13. {
  14. private static string FileAuditContainer = "FileAudits";
  15. private static string UserAuditContainer = "UserAudits";
  16. private static string GroupAuditContainer = "GroupAudits";
  17. private static string TenantAuditContainer = "TenantAudits";
  18. private readonly ILogger<EventProcessor> _logger;
  19. public EventProcessor(ILogger<EventProcessor> logger)
  20. {
  21. _logger = logger;
  22. }
  23. [Function("ProcessEvent")]
  24. public async Task ProcessCDPJob(
  25. [ServiceBusTrigger("mail-events-queue", Connection = "ServiceBusConnectionString")]
  26. ServiceBusReceivedMessage message,
  27. ServiceBusMessageActions messageActions)
  28. {
  29. _logger.LogInformation("Message ID: {id}", message.MessageId);
  30. _logger.LogInformation("Message Body: {body}", message.Body);
  31. _logger.LogInformation("Message Content-Type: {contentType}", message.ContentType);
  32. Job cdpJob;
  33. try
  34. {
  35. cdpJob = JsonSerializer.Deserialize<Job>(message.Body);
  36. }
  37. catch (Exception ex)
  38. {
  39. _logger.LogError("received incorrect structure in body: {0} got exception: {1}", message.Body, ex);
  40. await messageActions.CompleteMessageAsync(message);
  41. return;
  42. }
  43. if (cdpJob != null)
  44. {
  45. switch (cdpJob.EventType)
  46. {
  47. case JobType.MailMetaProcessing:
  48. await ProcessMailMetaJob(cdpJob);
  49. break;
  50. case JobType.KeyVaultInsertion:
  51. _logger.LogInformation("key vault insertion processing");
  52. await ProcessKeyVaultInsertionJob(cdpJob);
  53. break;
  54. case JobType.AddAudits:
  55. _logger.LogInformation("Adding cdp audits");
  56. await ProcessAuditEvent(cdpJob);
  57. break;
  58. case JobType.KeyVaultInsertionBatch:
  59. _logger.LogInformation("batch key vault insertion processing");
  60. await ProcessKeyVaultBatchJob(cdpJob);
  61. break;
  62. case JobType.AddAuditsBatch:
  63. _logger.LogInformation("batch audit insertion in progress");
  64. await ProcessAuditsBatchJob(cdpJob);
  65. break;
  66. default:
  67. break;
  68. }
  69. }
  70. // Complete the message
  71. await messageActions.CompleteMessageAsync(message);
  72. }
  73. internal async Task ProcessAuditEvent(Job job)
  74. {
  75. AuditEventMetadata auditEventMetadata = JsonSerializer.Deserialize<AuditEventMetadata>(job.JobMetadata);
  76. await AuditFunctions.AddAudits(job.AppKey, auditEventMetadata.FileId, auditEventMetadata.FileName, auditEventMetadata.UserId, "", auditEventMetadata.Action, auditEventMetadata.Message);
  77. return;
  78. }
  79. internal async Task ProcessKeyVaultBatchJob(Job job)
  80. {
  81. KeyVaultService kvs = new KeyVaultService(Constants.KeyVaultURI);
  82. List<KeyVaultEvent> events = new List<KeyVaultEvent>();
  83. try
  84. {
  85. events = JsonSerializer.Deserialize<List<KeyVaultEvent>>(job.JobMetadata);
  86. events.ForEach(async (KeyVaultEvent evt) =>
  87. {
  88. await kvs.SetSecretAsync(evt.FileId, evt.AESKey);
  89. });
  90. } catch (Exception ex)
  91. {
  92. _logger.LogError("Error parsing job: {1} keyvault event metadata error: {0}", ex.ToString(), JsonSerializer.Serialize(job));
  93. return;
  94. }
  95. }
  96. internal async Task ProcessAuditsBatchJob(Job job)
  97. {
  98. try
  99. {
  100. List<AuditRecord> tenantAuditRecords = new List<AuditRecord>();
  101. List<AuditRecord> userAuditRecords = new List<AuditRecord>();
  102. List<AuditEventMetadata> auditEventMetadata = JsonSerializer.Deserialize<List<AuditEventMetadata>>(job.JobMetadata);
  103. for(int i = 0; i<auditEventMetadata.Count; i++)
  104. {
  105. FileAuditRecord fileAuditRecord = new FileAuditRecord
  106. {
  107. Action = auditEventMetadata[i].Action,
  108. FileId = auditEventMetadata[i].FileId,
  109. FileName = auditEventMetadata[i].FileName,
  110. Message = auditEventMetadata[i].Message,
  111. UserId = auditEventMetadata[i].UserId,
  112. GroupId = ""
  113. };
  114. await AuditDB.AppendRecord(fileAuditRecord.id, fileAuditRecord, FileAuditContainer);
  115. TenantAuditRecord auditRecord = new TenantAuditRecord
  116. {
  117. AppKey = job.AppKey,
  118. Action = auditEventMetadata[i].Action,
  119. FileId = auditEventMetadata[i].FileId,
  120. FileName = auditEventMetadata[i].FileName,
  121. Message = auditEventMetadata[i].Message,
  122. UserId = auditEventMetadata[i].UserId,
  123. GroupId = "",
  124. EventTime = DateTime.UtcNow,
  125. };
  126. tenantAuditRecords.Add(auditRecord);
  127. UserAuditRecord userAuditRecord = new UserAuditRecord
  128. {
  129. AppKey = job.AppKey,
  130. FileId = auditEventMetadata[i].FileId,
  131. FileName = auditEventMetadata[i].FileName,
  132. Message = auditEventMetadata[i].Message,
  133. UserId = auditEventMetadata[i].UserId,
  134. Action = auditEventMetadata[i].Action,
  135. EventTime = DateTime.UtcNow,
  136. GroupId = ""
  137. };
  138. userAuditRecords.Add(userAuditRecord);
  139. }
  140. await AuditDB.AppendRecordsList(job.AppKey, tenantAuditRecords, TenantAuditContainer);
  141. await AuditDB.AppendRecordsList(userAuditRecords[0].UserId, userAuditRecords, UserAuditContainer);
  142. }
  143. catch (Exception ex)
  144. {
  145. _logger.LogError($"{ex.Message}");
  146. }
  147. return;
  148. }
  149. internal async Task ProcessKeyVaultInsertionJob(Job cdpJob)
  150. {
  151. KeyVaultService kvs = new KeyVaultService(Constants.KeyVaultURI);
  152. KeyVaultEvent vaultEvent;
  153. try
  154. {
  155. vaultEvent = JsonSerializer.Deserialize<KeyVaultEvent>(cdpJob.JobMetadata);
  156. } catch (Exception ex)
  157. {
  158. _logger.LogError("Error parsing keyvault event metadata error: {0}",ex.ToString());
  159. return;
  160. }
  161. await kvs.SetSecretAsync(vaultEvent.FileId, vaultEvent.AESKey);
  162. return;
  163. }
  164. internal async Task ProcessMailMetaJob(Job cdpJob)
  165. {
  166. string Connection = Constants.StorageConnString;
  167. var blobServiceClient = new BlobServiceClient(Connection);
  168. var blobClient = blobServiceClient.GetBlobContainerClient(cdpJob.AppKey.ToLower());
  169. var blob = blobClient.GetBlobClient(string.Format("{0}", cdpJob.Id));
  170. var exists = await blob.ExistsAsync();
  171. if (!exists)
  172. {
  173. return;
  174. }
  175. BlobDownloadInfo blobDownloadInfo = await blob.DownloadAsync();
  176. _logger.LogDebug($"Exists?: {exists.ToString()}");
  177. _logger.LogDebug($"Download resp: {blobDownloadInfo.Details}");
  178. using (MemoryStream ms = new MemoryStream())
  179. {
  180. await blobDownloadInfo.Content.CopyToAsync(ms);
  181. byte[] byteArray = ms.ToArray();
  182. string mailRecJson = System.Text.Encoding.UTF8.GetString(byteArray);
  183. _logger.LogInformation("got event json from blob: " + mailRecJson);
  184. MailRecord record = JsonSerializer.Deserialize<MailRecord>(mailRecJson);
  185. await MailProcessor.ProcessMailEvent(record, _logger);
  186. }
  187. return;
  188. }
  189. }
  190. public class MailRecord
  191. {
  192. public required string AppKey { get; set; }
  193. public required string SenderEmail { get; set; }
  194. public required string MailId { get; set; } // this is actually body Id
  195. public required string BodyContent { get; set; } // this is body content
  196. public required List<string> Attachments { get; set; }
  197. public required List<string> ReceiverEmails { get; set; }
  198. public required List<AttachmentDetails> AttachmentDetails { get; set; }
  199. }
  200. public class AttachmentDetails
  201. {
  202. public required string FileId { get; set; }
  203. public required string HeadRevisionId { get; set; }
  204. public string? FileType { get; set; }
  205. public required string FileName { get; set; }
  206. }
  207. public class Job
  208. {
  209. public required string Id { get; set; }
  210. public required string AppKey { get; set; }
  211. public required JobType EventType { get; set; }
  212. public string? JobMetadata { get; set; }
  213. }
  214. public enum JobType
  215. {
  216. MailMetaProcessing,
  217. KeyVaultInsertion,
  218. AddAudits,
  219. KeyVaultInsertionBatch,
  220. AddAuditsBatch
  221. }
  222. public class KeyVaultEvent
  223. {
  224. public required string FileId { get; set; }
  225. public required string AESKey { get; set; }
  226. }
  227. public class AuditEventMetadata
  228. {
  229. public required string FileId { get; set; }
  230. public string FileName { get; set; }
  231. public string UserId { get; set; }
  232. public string GroupId { get; set; }
  233. public string Action { get; set; }
  234. public string Message { get; set; }
  235. }
  236. }