Today, I present a modified HDFS sink for Flume, purely a prototype, with support for one-to-one file creation for each event. This sink assumes that events will be ingested, or later intercepted to have a header associated with the event with the destination filename.
First, we define a new configuration variable to determine that for a particular HDFS sink, we want one event created per event.
agent.sinks.hdfs-sink.hdfs.singleBucket = true
Now, let’s modify the HDFS Event Sink that comes standard with Flume to use this configuration variable as a determination to perform this alternate type of write.
// Extract headers.
Map<String, String> headers = event.getHeaders();
String destinationName = headers.get("destinationName");
if(singleBucket && destinationName != null) {
bucketWriter.appendSingle(event, destinationName);
} else {
bucketWriter.append(event);
}
Now, let’s implement the writer to create one file per event, based on the header that’s been appended to the event object.
/**
* Write a single object to HDFS.
*
* @throws IOException
* @throws InterruptedException
*/
public synchronized void appendSingle(final Event event, final String destinationName)
throws IOException, InterruptedException {
if ((filePath == null) || (writer == null)) {
throw new IOException("Invalid file settings");
}
final Configuration config = new Configuration();
// disable FileSystem JVM shutdown hook
config.setBoolean("fs.automatic.close", false);
synchronized (staticLock) {
checkAndThrowInterruptedException();
if(destinationName != null) {
try {
bucketPath = filePath + "/" + destinationName + inUseSuffix;
targetPath = filePath + "/" + destinationName;
LOG.info("Creating " + bucketPath);
callWithTimeout(new CallRunner<Void>() {
@Override
public Void call() throws Exception {
// Open.
if (codeC == null) {
// Need to get reference to FS using above config before underlying
// writer does in order to avoid shutdown hook & IllegalStateExceptions
fileSystem = new Path(bucketPath).getFileSystem(config);
writer.open(bucketPath);
} else {
// need to get reference to FS before writer does to avoid shutdown hook
fileSystem = new Path(bucketPath).getFileSystem(config);
writer.open(bucketPath, codeC, compType);
}
// Increment counters.
sinkCounter.incrementConnectionCreatedCount();
resetCounters();
// Write.
sinkCounter.incrementEventDrainAttemptCount();
writer.append(event);
writer.sync();
// Close.
writer.close();
sinkCounter.incrementConnectionClosedCount();
// Rename.
renameBucket();
return null;
}
});
} catch (Exception ex) {
sinkCounter.incrementConnectionFailedCount();
if (ex instanceof IOException) {
throw (IOException) ex;
} else {
throw Throwables.propagate(ex);
}
}
}
}
}
The code for this prototype is available on GitHub.