segment creation with redis bloom filters

Signed-off-by: chandresh pancholi <chandresh.pancholi@navi.com>
This commit is contained in:
chandresh pancholi
2021-10-06 12:18:58 +05:30
parent e8e82cdf74
commit 47ddd6c20a
35 changed files with 512 additions and 48 deletions

View File

@@ -13,6 +13,12 @@
<name>litmus-cache</name>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.5.5</version>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>

View File

@@ -20,6 +20,11 @@ public class RedisCache implements CacheCommands {
return bloomFilter.tryInit(redisConfiguration.getExpectedInsertions(), redisConfiguration.getFalseProbability());
}
@Override
public RBloomFilter<String> getBf(String name) {
return redissonClient.getBloomFilter(name);
}
@Override
public Boolean add(RBloomFilter<String> bf, String key) {
return bf.add(key);
@@ -29,4 +34,9 @@ public class RedisCache implements CacheCommands {
public Boolean contains(RBloomFilter<String> bf, String key) {
return bf.contains(key);
}
@Override
public Boolean delete(String bucket) {
return redissonClient.getBucket(bucket).delete();
}
}

View File

@@ -5,7 +5,11 @@ import org.redisson.api.RBloomFilter;
public interface CacheCommands {
Boolean reserveBF(String name);
RBloomFilter<String> getBf(String name);
Boolean add(RBloomFilter<String> bf, String key);
Boolean contains(RBloomFilter<String> bf, String key);
Boolean delete(String bucket);
}

View File

@@ -15,12 +15,12 @@ public class RedisContainer {
private final RedisConfiguration redisConfiguration;
@Bean
public RedissonClient redissonClient(Config config) {
public RedissonClient redissonClient() {
Config c = new Config();
c.useSingleServer()
.setAddress(String.format("redis://%s:%s", redisConfiguration.getRedisHost(), redisConfiguration.getRedisPort()));
return Redisson.create(config);
return Redisson.create(c);
}

View File

@@ -48,7 +48,7 @@
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
@@ -63,17 +63,13 @@
<version>2.17.51</version>
</dependency>
<dependency>
<groupId>com.navi.medici.utils</groupId>
<artifactId>event-bus-client</artifactId>
<version>0.1.3-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>
<version>5.5.2</version>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,12 @@
package com.navi.medici.config;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
@Configuration
@Data
public class LitmusConfig {
@Value("${segment.s3.bucket}")
String s3Bucket;
}

View File

@@ -1,23 +1,28 @@
package com.navi.medici.controller.v1;
import com.navi.medici.request.v1.SegmentRequest;
import com.navi.medici.service.SegmentService;
import lombok.RequiredArgsConstructor;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RequestPart;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile;
@RestController
@RequiredArgsConstructor
@RequestMapping("/v1/bucket")
@RequestMapping("/v1/segments")
public class SegmentController {
private final SegmentService segmentService;
@PostMapping(value = "/create")
public void createSegment(@RequestParam("file") MultipartFile file,
@RequestParam("segment") SegmentRequest segmentRequest) {
@PostMapping(value = "/create", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
public void createSegment(@RequestPart("file") MultipartFile file,
@RequestPart("segment") SegmentRequest segmentRequest) {
segmentService.createSegment(file, segmentRequest);
}
}

View File

@@ -0,0 +1,42 @@
package com.navi.medici.s3;
import com.navi.medici.config.LitmusConfig;
import java.io.FileWriter;
import java.io.IOException;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
@Service
@Log4j2
@RequiredArgsConstructor
public class S3Service {
private final S3Client s3Client;
private final LitmusConfig litmusConfig;
public void upload(MultipartFile file, String destinationBucket) {
try (FileWriter writer = new FileWriter("/tmp/"+destinationBucket)) {
writer.write(new String(file.getBytes()));
} catch (IOException e) {
log.error("file upload failed", e);
}
// try {
// PutObjectRequest putOb = PutObjectRequest.builder()
// .bucket(litmusConfig.getS3Bucket())
// .key(destinationBucket)
// .build();
//
// PutObjectResponse response = s3Client.putObject(putOb,
// RequestBody.fromBytes(file.getBytes()));
// } catch (IOException e) {
// log.error("s3 upload failed", e);
// }
}
}

View File

@@ -0,0 +1,11 @@
package com.navi.medici.service;
import com.navi.medici.request.v1.SegmentRequest;
import org.springframework.web.multipart.MultipartFile;
public interface SegmentService {
void createSegment(MultipartFile file, SegmentRequest segmentRequest);
void segmentIdExist(String segmentName, String id);
}

View File

@@ -0,0 +1,60 @@
package com.navi.medici.service;
import com.navi.medici.command.CacheCommands;
import com.navi.medici.entity.SegmentEntity;
import com.navi.medici.query.segment.ISegmentQuery;
import com.navi.medici.request.v1.SegmentCsv;
import com.navi.medici.request.v1.SegmentRequest;
import com.navi.medici.s3.S3Service;
import com.navi.medici.util.CsvUtil;
import java.util.List;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;
@Service
@Log4j2
@RequiredArgsConstructor
public class SegmentServiceImpl implements SegmentService {
private final CsvUtil csvUtil;
private final S3Service s3Service;
private final CacheCommands cacheCommands;
private final ISegmentQuery segmentQuery;
@Override
public void createSegment(MultipartFile file, SegmentRequest segmentRequest) {
String destinationBucketName = UUID.randomUUID().toString();
s3Service.upload(file, destinationBucketName);
List<SegmentCsv> segmentCsvList = csvUtil.csvToList(file);
var bfCreated = cacheCommands.reserveBF(segmentRequest.getName());
var bloomFilter = cacheCommands.getBf(segmentRequest.getName());
log.info("ingesting data to bloom filter. segment_name: {}", segmentRequest.getName());
segmentCsvList.forEach(segment ->{
var temp = cacheCommands.add(bloomFilter, segment.getSegmentId());
log.info("key: {}, result: {}", segment.getSegmentId(), temp);
});
var segmentEntity = SegmentEntity.builder()
.segmentId(UUID.randomUUID().toString())
.segmentName(segmentRequest.getName())
.segmentDescription(segmentRequest.getDescription())
.segmentType(segmentRequest.getSegmentType())
.destinationBucket(destinationBucketName)
.build();
segmentQuery.save(segmentEntity);
}
@Override
public void segmentIdExist(String segmentName, String id) {
var bloomFilter = cacheCommands.getBf(segmentName);
var result = cacheCommands.contains(bloomFilter, id);
}
}

View File

@@ -0,0 +1,34 @@
package com.navi.medici.util;
import com.navi.medici.request.v1.SegmentCsv;
import com.opencsv.bean.CsvToBeanBuilder;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.List;
import lombok.extern.log4j.Log4j2;
import org.springframework.stereotype.Component;
import org.springframework.web.multipart.MultipartFile;
@Component
@Log4j2
public class CsvUtil {
public List<SegmentCsv> csvToList(MultipartFile file) {
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(file.getInputStream(), StandardCharsets.UTF_8));
var cb = new CsvToBeanBuilder(reader)
.withType(SegmentCsv.class)
.withIgnoreLeadingWhiteSpace(true)
.build();
return cb.parse();
} catch (IOException e) {
log.error("data csv read failed. error: {}", e.getMessage());
throw new RuntimeException("data csv read failed.", e);
}
}
}

View File

@@ -0,0 +1,47 @@
package com.navi.medici.util;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.CollectionType;
import java.util.ArrayList;
import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
@Log4j2
public class JacksonUtils {
private final ObjectMapper objectMapper;
public String objectToString(Object o) {
try {
return objectMapper.writeValueAsString(o);
} catch (JsonProcessingException e) {
throw new RuntimeException("object to string conversion failed", e);
}
}
public <T> T stringToObject(String s, Class<T> klazz) {
try {
return objectMapper.readValue(s, klazz);
} catch (JsonProcessingException e) {
throw new RuntimeException("string to object conversion failed", e);
}
}
public <T> List<T> stringToListObject(String s, Class<T> klazz) {
try {
CollectionType listType = objectMapper.getTypeFactory().constructCollectionType(ArrayList.class, klazz);
return objectMapper.readValue(s, listType);
} catch (JsonProcessingException e) {
throw new RuntimeException("string to list object conversion failed", e);
}
}
public <T> T convertValue(Object o, Class<T> klazz) {
return objectMapper.convertValue(o, klazz);
}
}

View File

@@ -1,7 +1,7 @@
spring.datasource.hikari.maximum-pool-size=${DB_POOL_MAX_SIZE:2}
spring.datasource.hikari.minimum-idle=${DB_POOL_MIN_IDLE:1}
spring.datasource.hikari.idle-timeout=${DB_POOL_IDLE_TIMEOUT_IN_MS:30000}
spring.datasource.url=${DATASOURCE_URL:jdbc:postgresql://localhost:5432/santa}?stringtype=unspecified
spring.datasource.url=${DATASOURCE_URL:jdbc:postgresql://localhost:5432/litmus}?stringtype=unspecified
spring.datasource.username=${DATASOURCE_USERNAME:postgres}
spring.datasource.password=${DATASOURCE_PASSWORD:admin}
spring.datasource.initialization-mode=${DATA_INITIALIZATION_MODE:never}
@@ -15,4 +15,13 @@ spring.jpa.hibernate.ddl-auto=none
kafka.servers=${KAFKA_SERVER:localhost:9092}
audit.kafka.servers=${AUDIT_KAFKA_SERVER:localhost:9092}
kafka.auditlog.topic=${AUDIT_LOG_TOPIC:audit-logs}
kms.base-url=${KMS_BASE_URL:http://google.com}
kms.base-url=${KMS_BASE_URL:http://google.com}
redis.host=127.0.0.1
redis.port=6379
redis.expected.insertions=99999
redis.false.probability=0.001
segment.s3.bucket=navi-test
report.s3.region=temp

View File

@@ -25,10 +25,9 @@
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
<groupId>com.navi.medici</groupId>
<artifactId>litmus-model</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@@ -1,8 +1,11 @@
package com.navi.medici.entity;
import com.navi.medici.enums.ExperimentType;
import java.time.LocalDateTime;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.EnumType;
import javax.persistence.Enumerated;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
@@ -53,7 +56,8 @@ public class ExperimentEntity {
String variants;
@Column(name = "type")
String type;
@Enumerated(EnumType.STRING)
ExperimentType type;
@Column(name = "start_time")
LocalDateTime startTime;

View File

@@ -1,13 +1,17 @@
package com.navi.medici.entity;
import com.navi.medici.enums.SegmentType;
import java.time.LocalDateTime;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.EnumType;
import javax.persistence.Enumerated;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;
import javax.persistence.Version;
import javax.swing.text.Segment;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
@@ -44,6 +48,10 @@ public class SegmentEntity {
@Column(name = "bucket")
String destinationBucket;
@Column(name = "segment_type")
@Enumerated(EnumType.STRING)
SegmentType segmentType;
@Version
private Integer version;

View File

@@ -0,0 +1,29 @@
package com.navi.medici.query.experiment;
import com.navi.medici.entity.ExperimentEntity;
import com.navi.medici.repository.ExperimentRepository;
import java.util.List;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
public class ExperimentQueryImpl implements IExperimentQuery {
private final ExperimentRepository experimentRepository;
@Override
public List<ExperimentEntity> findByEnabled(Boolean enabled) {
return experimentRepository.findByEnabled(enabled);
}
@Override
public Optional<ExperimentEntity> findByName(String name) {
return experimentRepository.findByName(name);
}
@Override
public Optional<ExperimentEntity> findByExperimentId(String experimentId) {
return experimentRepository.findByExperimentId(experimentId);
}
}

View File

@@ -0,0 +1,13 @@
package com.navi.medici.query.experiment;
import com.navi.medici.entity.ExperimentEntity;
import java.util.List;
import java.util.Optional;
public interface IExperimentQuery {
List<ExperimentEntity> findByEnabled(Boolean enabled);
Optional<ExperimentEntity> findByName(String name);
Optional<ExperimentEntity> findByExperimentId(String experimentId);
}

View File

@@ -0,0 +1,12 @@
package com.navi.medici.query.segment;
import com.navi.medici.entity.SegmentEntity;
import java.util.Optional;
public interface ISegmentQuery {
Optional<SegmentEntity> findBySegmentId(String segmentId);
void save(SegmentEntity segmentEntity);
Optional<SegmentEntity> findBySegmentName(String segmentName);
}

View File

@@ -0,0 +1,28 @@
package com.navi.medici.query.segment;
import com.navi.medici.entity.SegmentEntity;
import com.navi.medici.repository.SegmentRepository;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
public class SegmentQueryImpl implements ISegmentQuery{
private final SegmentRepository segmentRepository;
@Override
public Optional<SegmentEntity> findBySegmentId(String segmentId) {
return segmentRepository.findBySegmentId(segmentId);
}
@Override
public void save(SegmentEntity segmentEntity) {
segmentRepository.save(segmentEntity);
}
@Override
public Optional<SegmentEntity> findBySegmentName(String segmentName) {
return segmentRepository.findBySegmentName(segmentName);
}
}

View File

@@ -0,0 +1,16 @@
package com.navi.medici.repository;
import com.navi.medici.entity.ExperimentEntity;
import java.util.List;
import java.util.Optional;
import org.springframework.data.repository.CrudRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface ExperimentRepository extends CrudRepository<ExperimentEntity, Long> {
List<ExperimentEntity> findByEnabled(Boolean enabled);
Optional<ExperimentEntity> findByName(String name);
Optional<ExperimentEntity> findByExperimentId(String experimentId);
}

View File

@@ -0,0 +1,13 @@
package com.navi.medici.repository;
import com.navi.medici.entity.SegmentEntity;
import java.util.Optional;
import org.springframework.data.repository.CrudRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface SegmentRepository extends CrudRepository<SegmentEntity, Long> {
Optional<SegmentEntity> findBySegmentId(String segmentId);
Optional<SegmentEntity> findBySegmentName(String segmentName);
}

View File

@@ -15,11 +15,49 @@
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.liquibase</groupId>
<artifactId>liquibase-core</artifactId>
<scope>runtime</scope>
<version>4.3.5</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.2.20</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>11</source>
<target>11</target>
<annotationProcessorPaths>
<path>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@@ -1,13 +0,0 @@
package com.navi.medici;
/**
* Hello world!
*
*/
public class App
{
public static void main( String[] args )
{
System.out.println( "Hello World!" );
}
}

View File

@@ -0,0 +1,12 @@
package com.navi.medici;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Database {
public static void main( String[] args ) {
SpringApplication.run(Database.class, args);
}
}

View File

@@ -0,0 +1,10 @@
spring.datasource.hikari.maximum-pool-size=${DB_POOL_MAX_SIZE:2}
spring.datasource.hikari.minimum-idle=${DB_POOL_MIN_IDLE:1}
spring.datasource.hikari.idle-timeout=${DB_POOL_IDLE_TIMEOUT_IN_MS:30000}
spring.datasource.url=${DATASOURCE_URL:jdbc:postgresql://localhost:5432/litmus}?stringtype=unspecified
spring.datasource.username=${DATASOURCE_USERNAME:postgres}
spring.datasource.password=${DATASOURCE_PASSWORD:admin}
spring.datasource.initialization-mode=${DATA_INITIALIZATION_MODE:never}
spring.jpa.hibernate.ddl-auto=none
spring.liquibase.change-log=classpath:db/liquibase-changelog.yaml
spring.main.web-application-type=NONE

View File

@@ -0,0 +1,18 @@
--liquibase formatted sql
--changeset author:chandresh id:202110051558
CREATE TABLE segments (
id SERIAL PRIMARY KEY,
segment_id varchar(36) NOT NULL,
name varchar(100) NOT NULL,
description text NOT NULL,
bucket varchar(100) NOT NULL,
segment_type varchar(20) NOT NULL,
version BIGINT,
created_at timestamp,
updated_at timestamp
);
CREATE INDEX idx_segment_id ON segments(segment_id);
CREATE INDEX idx_name ON segments(name);

View File

@@ -0,0 +1,3 @@
databaseChangeLog:
- includeAll:
path: db/changelog/

View File

@@ -46,13 +46,12 @@
<version>2.13.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>
<version>5.5.2</version>
</dependency>
</dependencies>

View File

@@ -0,0 +1,7 @@
package com.navi.medici.enums;
public enum ExperimentType {
EXPERIMENT,
RELEASE,
KILL_SWITCH
}

View File

@@ -0,0 +1,6 @@
package com.navi.medici.enums;
public enum SegmentType {
USER_ID,
DEVICE_ID
}

View File

@@ -0,0 +1,17 @@
package com.navi.medici.request.v1;
import com.opencsv.bean.CsvBindByName;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.FieldDefaults;
@AllArgsConstructor
@Getter
@NoArgsConstructor
@FieldDefaults(level = AccessLevel.PRIVATE)
public class SegmentCsv {
@CsvBindByName(column = "segment_id")
String segmentId;
}

View File

@@ -1,6 +1,8 @@
package com.navi.medici.request.v1;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.navi.medici.enums.SegmentType;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
@@ -17,10 +19,12 @@ import lombok.experimental.FieldDefaults;
@JsonIgnoreProperties(ignoreUnknown = true)
@FieldDefaults(level = AccessLevel.PRIVATE)
public class SegmentRequest {
@JsonProperty("name")
String name;
@JsonProperty("description")
String description;
@JsonProperty("segment_type")
SegmentType segmentType;
}

View File

@@ -1,3 +1,3 @@
DATASOURCE_URL=jdbc:postgresql://dev-db-service-db.np.navi-tech.in:5432/santa
DATASOURCE_URL=jdbc:postgresql://dev-db-service-db.np.navi-tech.in:5432/litmus
DATASOURCE_USERNAME=service_user
DATASOURCE_PASSWORD=JRCFCMXUXBJHGZVTPBNTXHYCCFVMWN

View File

@@ -31,6 +31,11 @@
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>