Modified Flume HDFS sink for one to one files

26 Jun 2013

Today, I present a modified HDFS sink for Flume, purely a prototype, with support for one-to-one file creation for each event. This sink assumes that events will be ingested, or later intercepted to have a header associated with the event with the destination filename.

First, we define a new configuration variable to determine that for a particular HDFS sink, we want one event created per event.

agent.sinks.hdfs-sink.hdfs.singleBucket = true

Now, let’s modify the HDFS Event Sink that comes standard with Flume to use this configuration variable as a determination to perform this alternate type of write.

// Extract headers.
Map<String, String> headers = event.getHeaders();
String destinationName = headers.get("destinationName");

if(singleBucket && destinationName != null) {
  bucketWriter.appendSingle(event, destinationName);
} else {

Now, let’s implement the writer to create one file per event, based on the header that’s been appended to the event object.

 * Write a single object to HDFS.
 * @throws IOException
 * @throws InterruptedException
public synchronized void appendSingle(final Event event, final String destinationName)
  throws IOException, InterruptedException {
  if ((filePath == null) || (writer == null)) {
    throw new IOException("Invalid file settings");

  final Configuration config = new Configuration();
  // disable FileSystem JVM shutdown hook
  config.setBoolean("fs.automatic.close", false);

  synchronized (staticLock) {

    if(destinationName != null) {
      try {
        bucketPath = filePath + "/" + destinationName + inUseSuffix;
        targetPath = filePath + "/" + destinationName;"Creating " + bucketPath);

        callWithTimeout(new CallRunner<Void>() {
          public Void call() throws Exception {
            // Open.
            if (codeC == null) {
              // Need to get reference to FS using above config before underlying
              // writer does in order to avoid shutdown hook & IllegalStateExceptions
              fileSystem = new Path(bucketPath).getFileSystem(config);
            } else {
              // need to get reference to FS before writer does to avoid shutdown hook
              fileSystem = new Path(bucketPath).getFileSystem(config);
    , codeC, compType);

            // Increment counters.

            // Write.

            // Close.

            // Rename.

            return null;
      } catch (Exception ex) {
        if (ex instanceof IOException) {
          throw (IOException) ex;
        } else {
          throw Throwables.propagate(ex);

The code for this prototype is available on GitHub.

comments powered by Disqus