Flink使用RestApi

   日期:2020-08-06     浏览:162    评论:0    
核心提示:flink是一个非常好用的流任务计算框架, 这次我们来试用flink的restApi来提交任务.1, 上传jar包 public static boolean uploadJar( File jarFile) { RequestBody requestBody = new MultipartBody.Builder() .setType(MultipartBody.FORM) .addFormDataPart(fil

flink是一个非常好用的流任务计算框架, 这次我们来试用flink的restApi来提交任务. 主要阐述几个常用的restapi, 包括上传jar包, 查询jar包, 提交任务, 查询任务, 删除任务等,
其它的比如删除jar包, 查询jobmanager, 查询taskmanager等等, 类推就可以得出了, 不在这里进行重复介绍了

1, 上传jar包

   public static boolean uploadJar( File jarFile) {
        RequestBody requestBody = new MultipartBody.Builder()
                .setType(MultipartBody.FORM)
                .addFormDataPart("file", jarFile.getName(),
                    RequestBody.create(MediaType.parse("multipart/form-data"), jarFile))
                .build();

        Request request = new Request.Builder()
                .url("http://host:port/jars/upload")
                .addHeader(userAgent, userAgentVal)
                .post(requestBody)
                .build();
       Response resp = OkHttpUtils.execute(request);
            if (OK == resp.code()) {
                    JSONObject body = JSON.parseObject(resp.body().string());
                    if ("success".equals(body.getString("status"))) {
                        return true;
                }
            }
        return false;
    }

2, 查询jar包

  Request request = new Request.Builder()
                .url("http://host:port/jars")
                .addHeader(userAgent, userAgentVal)
                .get()
                .build();
  Response response = OkHttpUtils.execute(request);
  String body = response.body().string();    

3,提交任务
(特别提示: 提交任务时, Main方法中,容易出现参数解析异常, 为了解决这一个问题, 强烈建议, 对参数进行编码转换, 对programArgs参数进行URLEncoder.encode(参数值, “utf-8”), 然后再在flink运行jar包, 进行解码.

       String baseUrl = "http://host:port/jars/${jarId}/run";
        Map<String, String> params = new HashMap<>();
        params.put("programArgs", "xxxxxx");
        params.put("entryClass", "com.xx.oo.JsonMain");
        params.put("parallelism", "2");
        params.put("savepointPath", null);
        Request request = new Request.Builder()
                .url(baseUrl)
                .addHeader(userAgent, userAgentVal)
                .post(RequestBody.create(JSON.toJSONString(params), MEDIA_TYPE_JSON))
                .build();
        Response resp = OkHttpUtils.execute(request);
        String respBody = resp.body().string();
        if (OK == resp.code()) {
                JSONObject body = JSON.parseObject(respBody);
                return body.getString("jobid");
        }

4,查询任务

       String url= "http://host:port/jobs";
       Request request = new Request.Builder()
                .url(url)
                .addHeader(userAgent, userAgentVal)
                .get()
                .build();
        Response resp = OkHttpUtils.execute(request);
        if (OK == resp.code()) {
            JSONObject body = JSON.parseObject(resp.body().string());
            if (body.containsKey("jobs")) {
                JSONArray jobs = body.getJSONArray("jobs");
                for (int i = 0; i < jobs.size(); i++) {
                    JSONObject jsb = jobs.getJSONObject(i);
                    String id = jsb.getString("id");
                    String status = jsb.getString("status");
                }
            }
        }else{
            logger.error("queryJobByHttp "+resp.body().string());
        }

4,删除任务

       String url= "http://host:port/jobs/${jobId}";
        Request request = new Request.Builder()
                .url(baseUrl)
                .addHeader(userAgent, userAgentVal)
                .patch(RequestBody.create("{}", MEDIA_TYPE_JSON))
                .build();
            Response resp = OkHttpUtils.execute(request);
            if (ACCEPTED == resp.code()) {
                return jobId;
            }
        return null;
 
打赏
 本文转载自:网络 
所有权利归属于原作者,如文章来源标示错误或侵犯了您的权利请联系微信13520258486
更多>最近资讯中心
更多>最新资讯中心
0相关评论

推荐图文
推荐资讯中心
点击排行
最新信息
新手指南
采购商服务
供应商服务
交易安全
关注我们
手机网站:
新浪微博:
微信关注:

13520258486

周一至周五 9:00-18:00
(其他时间联系在线客服)

24小时在线客服